From d4f012bbccabf40aadb842db9802d9668845b669 Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Sat, 12 Sep 2020 18:39:37 +0800 Subject: [PATCH] =?UTF-8?q?AB=E5=8D=95=E5=88=9B=E5=BB=BA=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/me/zhengjie/common/ResponseCode.java | 4 +- .../config/thread/ThreadPoolConfig.java | 117 ++++++++++++++++++ .../abmessage/service/AbMessageService.java | 9 ++ .../service/impl/AbMessageServiceImpl.java | 33 +++-- .../form/service/FormMessageService.java | 4 +- .../service/impl/FormMessageServiceImpl.java | 4 +- .../repository/TaskRecordRepository.java | 2 + .../taskrecord/rest/TaskRecordController.java | 29 ++++- .../taskrecord/rest/vo/TaskRecordBuildVO.java | 2 +- .../service/impl/TaskRecordServiceImpl.java | 4 +- .../zhengjie/task/MergeRecordFilesTask.java | 4 +- .../zhengjie/task/ProduceLocalFileTask.java | 98 ++++++++++----- .../java/me/zhengjie/task/SendRecordTask.java | 4 +- .../main/resources/config/application-dev.yml | 22 ++++ .../resources/config/application-prod.yml | 26 ++++ .../test/java/me/zhengjie/SpringJPATest.java | 72 +++++++++++ .../src/test/java/me/zhengjie/TempTest.java | 9 +- 17 files changed, 380 insertions(+), 63 deletions(-) create mode 100644 eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java diff --git a/eladmin-system/src/main/java/me/zhengjie/common/ResponseCode.java b/eladmin-system/src/main/java/me/zhengjie/common/ResponseCode.java index e6a8baf..7c50eb6 100644 --- a/eladmin-system/src/main/java/me/zhengjie/common/ResponseCode.java +++ b/eladmin-system/src/main/java/me/zhengjie/common/ResponseCode.java @@ -14,10 +14,12 @@ public enum ResponseCode { SUCCESS(0,"SUCCESS"), ERROR(1,"ERROR"), - // 请求参数校验 + // 通用请求参数校验 ILLEGAL_ARGUMENT(1,"请求参数格式错误"), EMPTY_ARGUMENT(1,"请求参数为空"), NO_MATCH_ARGUMENT_SET(1,"不能满足要求的参数设置"), + // 特殊需要进行前端返回说明的参数定义 + TASK_NAME_IS_EXIST(1,"任务名称已经存在"), // 请求结果性的错误 NODATA_ERROR(1,"查询结果为空"), TASK_BUILD_ERROR(1,"任务建立失败"), diff --git a/eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java b/eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java new file mode 100644 index 0000000..2b40675 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java @@ -0,0 +1,117 @@ +package me.zhengjie.config.thread; + +import org.apache.tomcat.util.threads.ThreadPoolExecutor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +/** + * 线程池配置总类,用于配置线程池 + */ +@Configuration +@EnableAsync +public class ThreadPoolConfig { + + /** + * 从配置文件中加载线程池相关的配置 + */ + + /** + * produce + */ + @Value(value = "${produce.task.thread_pool.corePoolSize}") + private int produceCorePoolSize = 2; + @Value(value = "${produce.task.thread_pool.maxPoolSize}") + private int produceMaxPoolSize = 16; + @Value(value = "${produce.task.thread_pool.queueCapacity}") + private int produceQueueCapacity = 3; + @Value(value = "${produce.task.thread_pool.ThreadNamePrefix}") + private String produceThreadNamePrefix = "ProduceLocalFileTaskExecutor-"; // fixme 这个属性暂时没用起来 + + /** + * produce + */ + @Value(value = "${send.task.thread_pool.corePoolSize}") + private int sendCorePoolSize = 2; + @Value(value = "${send.task.thread_pool.maxPoolSize}") + private int sendMaxPoolSize = 16; + @Value(value = "${send.task.thread_pool.queueCapacity}") + private int sendQueueCapacity = 3; + @Value(value = "${send.task.thread_pool.ThreadNamePrefix}") + private String sendThreadNamePrefix = "ProduceLocalFileTaskExecutor-"; // fixme 这个属性暂时没用起来 + + /** + * produce + */ + @Value(value = "${merge.task.thread_pool.corePoolSize}") + private int mergeCorePoolSize = 2; + @Value(value = "${merge.task.thread_pool.maxPoolSize}") + private int mergeMaxPoolSize = 16; + @Value(value = "${merge.task.thread_pool.queueCapacity}") + private int mergeQueueCapacity = 3; + @Value(value = "${merge.task.thread_pool.ThreadNamePrefix}") + private String mergeThreadNamePrefix = "ProduceLocalFileTaskExecutor-"; // fixme 这个属性暂时没用起来 + + /** + * 生成本地课包任务 - 异步线程池的配置类 + * + * @return + */ + @Bean(value = "ProduceLocalFileTaskExecutor") + public Executor produceLocalFileTaskExecutor() { + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + produceCorePoolSize, + produceMaxPoolSize, + 3, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(produceQueueCapacity), + Executors.defaultThreadFactory(), + new ThreadPoolExecutor.DiscardOldestPolicy() + ); + return threadPoolExecutor; + } + + /** + * 发送本地课包任务 - 异步线程池的配置类 + * + * @return + */ + @Bean(value = "SendRecordTaskExecutor") + public Executor sendRecordTaskExecutor() { + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + sendCorePoolSize, + sendMaxPoolSize, + 3, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(sendQueueCapacity), + Executors.defaultThreadFactory(), + new ThreadPoolExecutor.DiscardOldestPolicy() + ); + return threadPoolExecutor; + } + + /** + * 合并本地课包任务 - 异步线程池的配置类 + * @return + */ + @Bean(value = "MergeRecordTaskExecutor") + public Executor mergeRecordTaskExecutor() { + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + mergeCorePoolSize, + mergeMaxPoolSize, + 3, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(mergeQueueCapacity), + Executors.defaultThreadFactory(), + new ThreadPoolExecutor.DiscardOldestPolicy() + ); + return threadPoolExecutor; + } + +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java index 1578098..328e97a 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java @@ -58,6 +58,15 @@ public interface AbMessageService { * @return */ Slice queryAll(Example example, Pageable pageable); + + /** + * 动态条件查询 + * + * @param criteria 查询的动态条件封装类 + * @param pageable 分页参数封装 + * @return + */ + Slice queryAllSlice(AbMessageQueryCriteria criteria, Pageable pageable); /** * 根据ID查询 * @param id ID diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java index ad26bbc..cf604b7 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java @@ -66,6 +66,28 @@ public class AbMessageServiceImpl implements AbMessageService { return abMessageMapper.toDto(abMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder))); } + /** + * + * @param example 查询的动态条件封装类 + * @param pageable 分页参数封装 + * @return + */ + @Override + public Slice queryAll(Example example, Pageable pageable) { + return abMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,example,criteriaBuilder),pageable); + } + + /** + * + * @param criteria 查询的动态条件封装类 + * @param pageable 分页参数封装 + * @return + */ + @Override + public Slice queryAllSlice(AbMessageQueryCriteria criteria, Pageable pageable) { + return abMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder),pageable); + } + @Override @Transactional public AbMessageDto findById(Long id) { @@ -115,16 +137,7 @@ public class AbMessageServiceImpl implements AbMessageService { FileUtil.downloadExcel(list, response); } - /** - * - * @param example 查询的动态条件封装类 - * @param pageable 分页参数封装 - * @return - */ - @Override - public Slice queryAll(Example example, Pageable pageable) { - return abMessageRepository.findAll(example, pageable); - } + @Override @Deprecated diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java index 5db912e..8d5f072 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java @@ -54,11 +54,11 @@ public interface FormMessageService { /** * 动态条件查询 * - * @param example 查询的动态条件封装类 + * @param criteria 查询的动态条件封装类 * @param pageable 分页参数封装 * @return */ - Slice queryAll(Example example, Pageable pageable); + Slice queryAllSlice(FormMessageQueryCriteria criteria, Pageable pageable); /** * 根据ID查询 diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java index bec1f9a..29aa989 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java @@ -64,8 +64,8 @@ public class FormMessageServiceImpl implements FormMessageService { } @Override - public Slice queryAll(Example example, Pageable pageable) { - return formMessageRepository.findAll(example, pageable); + public Slice queryAllSlice(FormMessageQueryCriteria criteria, Pageable pageable) { + return formMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder), pageable); } @Override diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/repository/TaskRecordRepository.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/repository/TaskRecordRepository.java index a34ff1c..07f1b3e 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/repository/TaskRecordRepository.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/repository/TaskRecordRepository.java @@ -25,4 +25,6 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor; * @date 2020-09-09 **/ public interface TaskRecordRepository extends JpaRepository, JpaSpecificationExecutor { + + } \ No newline at end of file diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java index 81cde15..e8faccd 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java @@ -15,7 +15,7 @@ */ package me.zhengjie.modules.taskrecord.rest; -import cn.hutool.core.util.NumberUtil; +import me.zhengjie.annotation.AnonymousAccess; import me.zhengjie.annotation.Log; import me.zhengjie.common.CommonResponse; import me.zhengjie.common.ResponseCode; @@ -118,9 +118,15 @@ public class TaskRecordController { @PostMapping("/buildTask") @Log("新建课包任务") @ApiOperation("新建课包任务") - @PreAuthorize("@el.check('taskRecord:add')") +// @PreAuthorize("@el.check('taskRecord:add')") + @AnonymousAccess public ResponseEntity buildTask(@RequestBody TaskRecordBuildVO taskRecordBuildVO){ TaskRecord resources = taskRecordBuildVO.getResources(); + // 任务名称不能重复 + String taskName = resources.getTaskName(); + if (!checkTaskNameForBuild(taskName)){ + return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.TASK_NAME_IS_EXIST), HttpStatus.OK); + } String tag = taskRecordBuildVO.getTag(); if (StringUtils.isBlank(tag)){ return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); @@ -131,6 +137,16 @@ public class TaskRecordController { return new ResponseEntity<>(CommonResponse.createBySuccess(), HttpStatus.OK); } + private boolean checkTaskNameForBuild(String taskName){ + TaskRecordQueryCriteria criteria = new TaskRecordQueryCriteria(); + criteria.setTaskName(taskName); + List taskRecordDtos = taskRecordService.queryAll(criteria); + if (!CollectionUtils.isEmpty(taskRecordDtos)){ + return Boolean.FALSE; + } + return Boolean.TRUE; + } + /** * 发送课包任务,支持同事发送多个课包任务,然后进行发送 * @@ -139,8 +155,9 @@ public class TaskRecordController { */ @Log("发送课包任务") @ApiOperation("发送课包任务") - @PreAuthorize("@el.check('taskRecord:list')") +// @PreAuthorize("@el.check('taskRecord:list')") @PostMapping(value = "/sendTask") + @AnonymousAccess public ResponseEntity sendTask(@RequestBody TaskRecordSendVO taskRecordSendVO){ // 参数校验 Integer id = taskRecordSendVO.getId(); @@ -162,8 +179,9 @@ public class TaskRecordController { */ @Log("发送课包数目校验") @ApiOperation("发送课包数目校验") - @PreAuthorize("@el.check('taskRecord:list')") +// @PreAuthorize("@el.check('taskRecord:list')") @PostMapping(value = "/checkSendTotal") + @AnonymousAccess public ResponseEntity checkSendTotal(@RequestParam(value = "sendTotal", defaultValue = "0") Long sendTotal, @RequestParam(value = "id")Integer id){ if (sendTotal == null || id == null){ @@ -198,8 +216,9 @@ public class TaskRecordController { */ @Log("任务课包任务合并") @ApiOperation("任务课包任务合并") - @PreAuthorize("@el.check('taskRecord:list')") +// @PreAuthorize("@el.check('taskRecord:list')") @PostMapping(value = "/mergeRecord") + @AnonymousAccess public ResponseEntity mergeRecord(@RequestBody TaskRecordMergeVO taskRecordMergeVO){ List idList = taskRecordMergeVO.getIdList(); String type = taskRecordMergeVO.getType(); diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordBuildVO.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordBuildVO.java index 2b61b24..78c8c8f 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordBuildVO.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordBuildVO.java @@ -15,7 +15,7 @@ public class TaskRecordBuildVO implements Serializable { private TaskRecord resources; /** - * 区分AB单和表单的标识 + * 区分AB单和表单 大数据任务 的标识 */ private String tag; diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java index 96fa61f..58a5cb2 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java @@ -69,12 +69,12 @@ public class TaskRecordServiceImpl implements TaskRecordService { return taskRecordMapper.toDto(taskRecord); } + + @Override @Transactional(rollbackFor = Exception.class) public TaskRecordDto create(TaskRecord resources) { // todo 增加新建课包的逻辑 - - TaskRecordDto taskRecordDto = taskRecordMapper.toDto(taskRecordRepository.save(resources)); return taskRecordDto; } diff --git a/eladmin-system/src/main/java/me/zhengjie/task/MergeRecordFilesTask.java b/eladmin-system/src/main/java/me/zhengjie/task/MergeRecordFilesTask.java index 42512df..c0a5a4e 100644 --- a/eladmin-system/src/main/java/me/zhengjie/task/MergeRecordFilesTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/task/MergeRecordFilesTask.java @@ -1,7 +1,5 @@ package me.zhengjie.task; -import com.alipay.api.domain.Person; -import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import me.zhengjie.modules.taskrecord.domain.TaskRecord; import me.zhengjie.modules.taskrecord.service.TaskRecordService; @@ -60,7 +58,7 @@ public class MergeRecordFilesTask { * 任务处理入口,主要用于时间记录 * */ - @Async(value = "MergeRecordFilesTask") + @Async(value = "MergeRecordTaskExecutor") public void doRunTask(List ids, String tag, TaskRecord taskRecord){ Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); log.info("====== [ task start running, task name is {} ] ======", "MergeRecordFilesTask"); diff --git a/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java b/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java index 41ca866..50cce5f 100644 --- a/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java @@ -1,18 +1,23 @@ package me.zhengjie.task; -import cn.hutool.core.util.RandomUtil; import cn.hutool.system.OsInfo; import cn.hutool.system.SystemUtil; import com.alibaba.fastjson.JSON; +import com.sun.org.apache.xpath.internal.operations.Bool; import lombok.extern.slf4j.Slf4j; import me.zhengjie.modules.abmessage.domain.AbMessage; import me.zhengjie.modules.abmessage.service.AbMessageService; +import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria; import me.zhengjie.modules.form.domain.FormMessage; import me.zhengjie.modules.form.service.FormMessageService; +import me.zhengjie.modules.form.service.dto.FormMessageQueryCriteria; import me.zhengjie.modules.taskrecord.domain.TaskRecord; import me.zhengjie.modules.taskrecord.service.TaskRecordService; +import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto; +import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria; import me.zhengjie.utils.FileUtil; import me.zhengjie.utils.StringUtils; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Example; @@ -27,10 +32,11 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -50,7 +56,6 @@ public class ProduceLocalFileTask { */ private static final String TAG_FORM = "FORM"; - /** * 分页查询起始页 - 默认从0开始计算 */ @@ -59,7 +64,7 @@ public class ProduceLocalFileTask { /** * 分页查询 - 单次2000条 */ - private static final int SIZE_PAGE = 2000; + private static final int SIZE_PAGE = 200; /** * 构建成功标识 @@ -69,17 +74,17 @@ public class ProduceLocalFileTask { /** * Windows环境下的路径信息 */ - @Value(value = "file.windows.path") + @Value(value = "${file.windows.path}") private String windowsFilePath; /** * Windows环境下的路径信息 */ - @Value(value = "file.linux.path") + @Value(value = "${file.linux.path}") private String linuxFilePath; /** * Windows环境下的路径信息 */ - @Value(value = "file.mac.path") + @Value(value = "${file.mac.path}") private String macFilePath; @Autowired @@ -107,12 +112,11 @@ public class ProduceLocalFileTask { private void runTask(TaskRecord task, String tag) { // 转换需要的查询参数 String params = task.getParams(); - TaskRecord taskRecord = new TaskRecord(); // 组装分页查询参数 Pageable pageable = PageRequest.of(START_PAGE, SIZE_PAGE); // 设置不同的查询条件,去查AB表和Form表的数据 // 构建写入文件的路径 - buildFileWritePath(taskRecord); + buildFileWritePath(task); if (StringUtils.equalsIgnoreCase(TAG_AB, tag)){ // Json参数解析 AbMessage abMessage = new AbMessage(); @@ -120,7 +124,7 @@ public class ProduceLocalFileTask { abMessage = JSON.parseObject(params, AbMessage.class); } // 构建分页查询,并写入到文件中 - buildABQueryToFile(taskRecord, pageable, abMessage); + buildABQueryToFile(task, pageable, abMessage); } if (StringUtils.equalsIgnoreCase(TAG_FORM, tag)){ @@ -130,7 +134,7 @@ public class ProduceLocalFileTask { formMessage = JSON.parseObject(params, FormMessage.class); } // 构建分页查询,并写入到文件中 - buildFromQueryToFile(taskRecord, pageable, formMessage); + buildFromQueryToFile(task, pageable, formMessage); } } @@ -139,22 +143,30 @@ public class ProduceLocalFileTask { log.error("============[Parse formMessage fail, please check formMessage info ]============"); return; } - Example example = Example.of(formMessage); + FormMessageQueryCriteria formMessageQueryCriteria = new FormMessageQueryCriteria(); + BeanUtils.copyProperties(formMessage, formMessageQueryCriteria); + int count = 0; // 记录总的生成记录数 while(true){ // AB表查询 - Slice slice = formMessageService.queryAll(example, pageable); + Slice slice = formMessageService.queryAllSlice(formMessageQueryCriteria, pageable); if (slice == null){ - log.error("============[Query FormMessage fail, please check example info : {} ]============", example.getProbe().toString()); + log.error("============[Query FormMessage fail, please check example info : {} ]============", formMessageQueryCriteria.toString()); return; } - List abMessageList = slice.getContent(); - if (!CollectionUtils.isEmpty(abMessageList)){ + List formMessageList = slice.getContent(); + if (!CollectionUtils.isEmpty(formMessageList)){ // 只获取需要的字段 - List collect = abMessageList.stream() + List collect = formMessageList.stream() .map(FormMessage::getUid) .collect(Collectors.toList()); // 设置总数 - taskRecord.setTotal((long) slice.getSize()); + count += formMessageList.size(); + taskRecord.setTotal((long) count); + TaskRecordDto taskRecordDto = taskRecordService.create(taskRecord); + if (taskRecordDto == null){ + log.error("============[Write taskRecord fail, please check example info : {} ]============"); + return; + } // 解析并写入文件 writeToFile(taskRecord, collect); } @@ -170,12 +182,14 @@ public class ProduceLocalFileTask { log.error("============[Parse abMessage fail, please check abMessage info ]============"); return; } - Example example = Example.of(abMessage); + AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria(); + BeanUtils.copyProperties(abMessage, abMessageQueryCriteria); + int count = 0; // 记录总的生成记录数 while(true){ // AB表查询 - Slice slice = abMessageService.queryAll(example, pageable); + Slice slice = abMessageService.queryAllSlice(abMessageQueryCriteria, pageable); if (slice == null){ - log.error("============[Query abMessage fail, please check example info : {} ]============", example.getProbe().toString()); + log.error("============[Query abMessage fail, please check example info : {} ]============", abMessageQueryCriteria.toString()); return; } List abMessageList = slice.getContent(); @@ -185,11 +199,19 @@ public class ProduceLocalFileTask { .map(AbMessage::getUid) .collect(Collectors.toList()); // 设置总数 - taskRecord.setTotal((long) slice.getSize()); + count += abMessageList.size(); + taskRecord.setTotal((long) count); // 解析并写入文件 + TaskRecordDto taskRecordDto = taskRecordService.create(taskRecord); + if (taskRecordDto == null){ + log.error("============[Write taskRecord fail, please check example info : {} ]============"); + return; + } writeToFile(taskRecord, collect); } if (!slice.hasNext()){ + // 更新发送记录 + updateRecord(taskRecord); break; } pageable = slice.nextPageable(); @@ -206,13 +228,15 @@ public class ProduceLocalFileTask { OsInfo osInfo = SystemUtil.getOsInfo(); String fullPath; if (osInfo.isWindows()){ + FileUtil.mkdir(windowsFilePath); // 构建存储文件 fullPath = windowsFilePath + FileUtil.buildOnlyFileNameRule(); }else if (osInfo.isLinux()){ + FileUtil.mkdir(linuxFilePath); // 构建存储文件 fullPath = linuxFilePath + FileUtil.buildOnlyFileNameRule(); }else if (osInfo.isMac()){ - // 构建存储文件 + FileUtil.mkdir(macFilePath); // 构建存储文件 fullPath = macFilePath + FileUtil.buildOnlyFileNameRule(); }else { @@ -236,25 +260,31 @@ public class ProduceLocalFileTask { try { if (!FileUtil.exist(fullPath)){ Files.write(Paths.get(fullPath), collect, StandardOpenOption.CREATE_NEW); - taskRecord.setIsBuild(SUCESS_BUILD_TAG); - TaskRecord update = taskRecordService.update(taskRecord); - if (update == null){ - log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString()); - } }else{ Files.write(Paths.get(fullPath), collect, StandardOpenOption.APPEND); - taskRecord.setIsBuild(SUCESS_BUILD_TAG); - TaskRecord update = taskRecordService.update(taskRecord); - if (update == null){ - log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString()); - } } } catch (IOException e) { log.error("write prepare send file fail, please check param, fullPath is {}, {}", fullPath, e ); } } - + private boolean updateRecord(TaskRecord taskRecord){ + // 查询并更新发送记录 + TaskRecordQueryCriteria queryCriteria = new TaskRecordQueryCriteria(); + queryCriteria.setTaskName(taskRecord.getTaskName()); + List taskRecordDtos = taskRecordService.queryAll(queryCriteria); + if (!CollectionUtils.isEmpty(taskRecordDtos) && taskRecordDtos.size() == 1){ + TaskRecordDto taskRecordDto = taskRecordDtos.get(0); + taskRecord.setId(taskRecordDto.getId()); + taskRecord.setIsBuild(SUCESS_BUILD_TAG); + TaskRecord update = taskRecordService.update(taskRecord); + if (update == null){ + log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString()); + return Boolean.FALSE; + } + } + return Boolean.TRUE; + } } diff --git a/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java b/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java index 92fc4b1..dd27ed2 100644 --- a/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java @@ -50,13 +50,12 @@ public class SendRecordTask { /** * 下游发送地址 */ - @Value(value = "req.db.url") + @Value(value = "${req.db.url}") private String url; @Autowired private TaskRecordService taskRecordService; - /** * 任务处理入口,主要用于时间记录 * @@ -71,6 +70,7 @@ public class SendRecordTask { log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "SendRecordTask", (endMilliSecond - satrtMilliSecond)); } + private void runTask(Integer id, Long limit) { // 准备所有需要发送任务 TaskRecordDto taskRecordDto = taskRecordService.findById(id); diff --git a/eladmin-system/src/main/resources/config/application-dev.yml b/eladmin-system/src/main/resources/config/application-dev.yml index b5a32ff..6d61405 100644 --- a/eladmin-system/src/main/resources/config/application-dev.yml +++ b/eladmin-system/src/main/resources/config/application-dev.yml @@ -115,3 +115,25 @@ file: req: db: url: # 设置给大坝回传号码的地址 +# 线程池的相关配置 +produce: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-' +send: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-' +merge: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-' diff --git a/eladmin-system/src/main/resources/config/application-prod.yml b/eladmin-system/src/main/resources/config/application-prod.yml index f1447ed..f7cb103 100644 --- a/eladmin-system/src/main/resources/config/application-prod.yml +++ b/eladmin-system/src/main/resources/config/application-prod.yml @@ -120,3 +120,29 @@ file: # 文件大小 /M maxSize: 100 avatarMaxSize: 5 +# 配置请求发送路径 +req: + db: + url: # 设置给大坝回传号码的地址 +# 线程池的相关配置 +produce: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-' +send: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-' +merge: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-' \ No newline at end of file diff --git a/eladmin-system/src/test/java/me/zhengjie/SpringJPATest.java b/eladmin-system/src/test/java/me/zhengjie/SpringJPATest.java index 2d5769f..242cc84 100644 --- a/eladmin-system/src/test/java/me/zhengjie/SpringJPATest.java +++ b/eladmin-system/src/test/java/me/zhengjie/SpringJPATest.java @@ -1,6 +1,10 @@ package me.zhengjie; import lombok.extern.slf4j.Slf4j; +import me.zhengjie.modules.abmessage.domain.AbMessage; +import me.zhengjie.modules.abmessage.service.AbMessageService; +import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria; +import me.zhengjie.modules.mnt.service.dto.AppQueryCriteria; import me.zhengjie.modules.student.domain.Student; import me.zhengjie.modules.student.repository.StudentRepository; import me.zhengjie.modules.student.service.StudentService; @@ -17,6 +21,7 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** @@ -33,6 +38,9 @@ public class SpringJPATest { @Autowired private StudentRepository studentRepository; + @Autowired + private AbMessageService abMessageService; + @Test public void testQueryBySlice() throws Exception{ @@ -66,4 +74,68 @@ public class SpringJPATest { pageable = slice.nextPageable(); } } + + @Test + public void testExample(){ + Pageable pageable = PageRequest.of(0,200); + AbMessage abMessage = new AbMessage(); + abMessage.setActName("编程猫"); + Example example = Example.of(abMessage); + Slice slice = abMessageService.queryAll(example, pageable); + System.out.println(slice.getContent()); + } + + @Test + public void testExampleQuery(){ + Pageable pageable = PageRequest.of(0,200); + AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria(); + abMessageQueryCriteria.setActName("编程"); + + int temp = 0; + while(true){ + Slice slice = abMessageService.queryAllSlice(abMessageQueryCriteria, pageable); + + temp += slice.getContent().size(); + if (!slice.hasNext()){ + break; + } + pageable = slice.nextPageable(); + } + System.out.println(temp); + } + + @Test + public void testExampleQuery1(){ + Pageable pageable = PageRequest.of(0,200); + AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria(); + abMessageQueryCriteria.setActName("编程猫"); + Example example = Example.of(abMessageQueryCriteria); + + Slice slice = abMessageService.queryAll(example, pageable); + System.out.println(slice.getContent()); + } + + @Test + public void testExampleQuery2(){ + Pageable pageable = PageRequest.of(0,200); + AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria(); + abMessageQueryCriteria.setActName("编程猫"); +// Example example = Example.of(abMessageQueryCriteria); + + Slice slice = abMessageService.queryAllSlice(abMessageQueryCriteria, pageable); + System.out.println(slice.getContent()); + } + + + @Test + public void testExampleQueryPage(){ + Pageable pageable = PageRequest.of(0,200); + AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria(); + abMessageQueryCriteria.setActName("编程猫"); + + int temp = 0; + Map stringObjectMap = abMessageService.queryAll(abMessageQueryCriteria, pageable); + System.out.println(stringObjectMap.toString()); + + } } diff --git a/eladmin-system/src/test/java/me/zhengjie/TempTest.java b/eladmin-system/src/test/java/me/zhengjie/TempTest.java index 03f6691..76f4384 100644 --- a/eladmin-system/src/test/java/me/zhengjie/TempTest.java +++ b/eladmin-system/src/test/java/me/zhengjie/TempTest.java @@ -1,6 +1,7 @@ package me.zhengjie; -import com.google.common.collect.Lists; +import com.alibaba.fastjson.JSON; +import me.zhengjie.modules.abmessage.domain.AbMessage; import me.zhengjie.utils.FileUtil; import org.junit.Test; @@ -49,4 +50,10 @@ public class TempTest { } } + @Test + public void testJsonParse(){ + String json = "{\"actName\":\"编程猫\"}"; + System.out.println(JSON.parseObject(json, AbMessage.class)); + } + }