diff --git a/eladmin-common/src/main/java/me/zhengjie/utils/HttpUtil.java b/eladmin-common/src/main/java/me/zhengjie/utils/HttpUtil.java index a189232..47991a8 100644 --- a/eladmin-common/src/main/java/me/zhengjie/utils/HttpUtil.java +++ b/eladmin-common/src/main/java/me/zhengjie/utils/HttpUtil.java @@ -22,7 +22,7 @@ public class HttpUtil extends cn.hutool.http.HttpUtil { * @param json 请求的body内容 * @return 返回请求结果 */ - private HttpResponse sendPostReq(String url, String json){ + public static HttpResponse sendPostReq(String url, String json){ HttpResponse httpResponse = HttpRequest .post(url) .body(json) diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java index 70c61b5..1578098 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/AbMessageService.java @@ -18,7 +18,10 @@ package me.zhengjie.modules.abmessage.service; import me.zhengjie.modules.abmessage.domain.AbMessage; import me.zhengjie.modules.abmessage.service.dto.AbMessageDto; import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria; +import org.springframework.data.domain.Example; import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; + import java.util.Map; import java.util.List; import java.io.IOException; @@ -47,6 +50,14 @@ public interface AbMessageService { */ List queryAll(AbMessageQueryCriteria criteria); + /** + * 动态条件查询 + * + * @param example 查询的动态条件封装类 + * @param pageable 分页参数封装 + * @return + */ + Slice queryAll(Example example, Pageable pageable); /** * 根据ID查询 * @param id ID diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java index b9b1751..ad26bbc 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/abmessage/service/impl/AbMessageServiceImpl.java @@ -15,6 +15,7 @@ */ package me.zhengjie.modules.abmessage.service.impl; +import com.google.common.collect.Lists; import me.zhengjie.modules.abmessage.domain.AbMessage; import me.zhengjie.utils.ValidationUtil; import me.zhengjie.utils.FileUtil; @@ -24,12 +25,16 @@ import me.zhengjie.modules.abmessage.service.AbMessageService; import me.zhengjie.modules.abmessage.service.dto.AbMessageDto; import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria; import me.zhengjie.modules.abmessage.service.mapstruct.AbMessageMapper; +import org.springframework.data.domain.Example; +import org.springframework.data.domain.Slice; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import me.zhengjie.utils.PageUtil; import me.zhengjie.utils.QueryHelp; +import org.springframework.util.CollectionUtils; + import java.util.List; import java.util.Map; import java.io.IOException; @@ -110,11 +115,22 @@ public class AbMessageServiceImpl implements AbMessageService { FileUtil.downloadExcel(list, response); } + /** + * + * @param example 查询的动态条件封装类 + * @param pageable 分页参数封装 + * @return + */ @Override + public Slice queryAll(Example example, Pageable pageable) { + return abMessageRepository.findAll(example, pageable); + } + + @Override + @Deprecated public boolean buildSendTask(List abMessageDtos) { // 记录待发送的相关信息 - return false; } } \ No newline at end of file diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java index 14ebcd4..5db912e 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/FormMessageService.java @@ -15,10 +15,14 @@ */ package me.zhengjie.modules.form.service; +import me.zhengjie.modules.abmessage.domain.AbMessage; import me.zhengjie.modules.form.domain.FormMessage; import me.zhengjie.modules.form.service.dto.FormMessageDto; import me.zhengjie.modules.form.service.dto.FormMessageQueryCriteria; +import org.springframework.data.domain.Example; import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; + import java.util.Map; import java.util.List; import java.io.IOException; @@ -47,6 +51,15 @@ public interface FormMessageService { */ List queryAll(FormMessageQueryCriteria criteria); + /** + * 动态条件查询 + * + * @param example 查询的动态条件封装类 + * @param pageable 分页参数封装 + * @return + */ + Slice queryAll(Example example, Pageable pageable); + /** * 根据ID查询 * @param id ID diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java index 8b92360..bec1f9a 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/form/service/impl/FormMessageServiceImpl.java @@ -24,6 +24,8 @@ import me.zhengjie.modules.form.service.FormMessageService; import me.zhengjie.modules.form.service.dto.FormMessageDto; import me.zhengjie.modules.form.service.dto.FormMessageQueryCriteria; import me.zhengjie.modules.form.service.mapstruct.FormMessageMapper; +import org.springframework.data.domain.Example; +import org.springframework.data.domain.Slice; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.data.domain.Page; @@ -61,6 +63,11 @@ public class FormMessageServiceImpl implements FormMessageService { return formMessageMapper.toDto(formMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder))); } + @Override + public Slice queryAll(Example example, Pageable pageable) { + return formMessageRepository.findAll(example, pageable); + } + @Override @Transactional public FormMessageDto findById(Long id) { diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java index 778086a..ab8f5f9 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/TaskRecordController.java @@ -15,15 +15,21 @@ */ package me.zhengjie.modules.taskrecord.rest; +import cn.hutool.core.util.NumberUtil; import me.zhengjie.annotation.Log; import me.zhengjie.common.CommonResponse; import me.zhengjie.common.ResponseCode; import me.zhengjie.modules.taskrecord.domain.TaskRecord; +import me.zhengjie.modules.taskrecord.rest.vo.TaskRecordBuildVO; +import me.zhengjie.modules.taskrecord.rest.vo.TaskRecordMergeVO; +import me.zhengjie.modules.taskrecord.rest.vo.TaskRecordSendVO; import me.zhengjie.modules.taskrecord.service.TaskRecordService; import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto; import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria; +import me.zhengjie.task.MergeRecordFilesTask; import me.zhengjie.task.ProduceLocalFileTask; import me.zhengjie.task.SendRecordTask; +import me.zhengjie.utils.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Pageable; import lombok.RequiredArgsConstructor; @@ -55,6 +61,8 @@ public class TaskRecordController { private SendRecordTask sendRecordTask; @Autowired private ProduceLocalFileTask produceLocalFileTask; + @Autowired + private MergeRecordFilesTask mergeRecordFilesTask; @Log("导出数据") @ApiOperation("导出数据") @@ -104,36 +112,44 @@ public class TaskRecordController { /** * 新建课包任务 * - * @param resources 新建任务的条件 + * @param taskRecordBuildVO 新建任务对应的VO映射 * @return 返回建立任务完成的状态 */ @PostMapping("/buildTask") @Log("新建课包任务") @ApiOperation("新建课包任务") @PreAuthorize("@el.check('taskRecord:add')") - public ResponseEntity buildTask(@Validated @RequestBody TaskRecord resources){ + public ResponseEntity buildTask(@RequestBody TaskRecordBuildVO taskRecordBuildVO){ + TaskRecord resources = taskRecordBuildVO.getResources(); + String tag = taskRecordBuildVO.getTag(); + if (StringUtils.isBlank(tag)){ + return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); + } // 启动建立课包任务 - produceLocalFileTask.doRunTask(resources); + produceLocalFileTask.doRunTask(resources, tag); // 返回结果 return new ResponseEntity<>(CommonResponse.createBySuccess(), HttpStatus.OK); } - /** * 发送课包任务,支持同事发送多个课包任务,然后进行发送 - * @param idList - * @return + * + * @param taskRecordSendVO 发送任务对应的VO映射 + * @return 返回建立任务完成的状态 */ @Log("发送课包任务") @ApiOperation("发送课包任务") @PreAuthorize("@el.check('taskRecord:list')") @PostMapping(value = "/sendTask") - public ResponseEntity sendTask(@RequestParam(value = "ids") List idList){ - if (CollectionUtils.isEmpty(idList)){ + public ResponseEntity sendTask(@RequestBody TaskRecordSendVO taskRecordSendVO){ + // 参数校验 + Integer id = taskRecordSendVO.getId(); + Long limit = taskRecordSendVO.getLimit(); + if (id == null || limit == null){ return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); } // 调用发送课包任务,开始发送 - sendRecordTask.doRunTask(idList); + sendRecordTask.doRunTask(id, limit); return new ResponseEntity<>(CommonResponse.createBySuccess(), HttpStatus.OK); } @@ -142,7 +158,7 @@ public class TaskRecordController { * * @param sendTotal 本次发送总条数 * @param id 要进行校验的发送任务包的Id - * @return + * @return 返回建立任务完成的状态 */ @Log("发送课包数目校验") @ApiOperation("发送课包数目校验") @@ -174,12 +190,23 @@ public class TaskRecordController { return Boolean.FALSE; } + /** + * 用于课包任务的合并工作 + * + * @param taskRecordMergeVO 课包合并任务对应的VO映射 + * @return 返回建立任务完成的状态 + */ @Log("任务课包任务合并") @ApiOperation("任务课包任务合并") @PreAuthorize("@el.check('taskRecord:list')") @PostMapping(value = "/mergeRecord") - public ResponseEntity mergeRecord(@RequestBody Integer[] ids, @RequestBody TaskRecord taskRecord){ - // TODO: 2020/9/8 0008 发送任务在相关逻辑类中实现 + public ResponseEntity mergeRecord(@RequestBody TaskRecordMergeVO taskRecordMergeVO){ + List idList = taskRecordMergeVO.getIdList(); + String type = taskRecordMergeVO.getType(); + if (CollectionUtils.isEmpty(idList) || StringUtils.isBlank(type)){ + return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); + } + mergeRecordFilesTask.doRunTask(idList, type); return new ResponseEntity<>(HttpStatus.OK); } // ================ 自定义功能实现 end ================ diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordBuildVO.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordBuildVO.java new file mode 100644 index 0000000..2b61b24 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordBuildVO.java @@ -0,0 +1,22 @@ +package me.zhengjie.modules.taskrecord.rest.vo; + +import lombok.Data; +import me.zhengjie.modules.taskrecord.domain.TaskRecord; + +import java.io.Serializable; + +@Data +public class TaskRecordBuildVO implements Serializable { + + + /** + * 课包的信息 + */ + private TaskRecord resources; + + /** + * 区分AB单和表单的标识 + */ + private String tag; + +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordMergeVO.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordMergeVO.java new file mode 100644 index 0000000..02758d2 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordMergeVO.java @@ -0,0 +1,23 @@ +package me.zhengjie.modules.taskrecord.rest.vo; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +/** + * 支持多个课包各种合并操作的VO + */ +@Data +public class TaskRecordMergeVO implements Serializable { + + /** + * 需要执行合并操作的课包Id集合 + */ + private List idList; + + /** + * 需要进行操作的类型 + */ + private String type; +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordSendVO.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordSendVO.java new file mode 100644 index 0000000..3c3d768 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/rest/vo/TaskRecordSendVO.java @@ -0,0 +1,22 @@ +package me.zhengjie.modules.taskrecord.rest.vo; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 发送课包VO + */ +@Data +public class TaskRecordSendVO implements Serializable { + + /** + * 需要进行发送课包的Id列表 + */ + private Integer id; + + /** + * 该次发送的个数限制 + */ + private Long limit; +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/TaskRecordService.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/TaskRecordService.java index 4bbd1cb..6fce7a7 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/TaskRecordService.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/TaskRecordService.java @@ -65,7 +65,7 @@ public interface TaskRecordService { * 编辑 * @param resources / */ - void update(TaskRecord resources); + TaskRecord update(TaskRecord resources); /** * 多选删除 diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java index 4f9510b..96fa61f 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/taskrecord/service/impl/TaskRecordServiceImpl.java @@ -81,11 +81,11 @@ public class TaskRecordServiceImpl implements TaskRecordService { @Override @Transactional(rollbackFor = Exception.class) - public void update(TaskRecord resources) { + public TaskRecord update(TaskRecord resources) { TaskRecord taskRecord = taskRecordRepository.findById(resources.getId()).orElseGet(TaskRecord::new); ValidationUtil.isNull( taskRecord.getId(),"TaskRecord","id",resources.getId()); taskRecord.copy(resources); - taskRecordRepository.save(taskRecord); + return taskRecordRepository.save(taskRecord); } @Override diff --git a/eladmin-system/src/main/java/me/zhengjie/task/MergeRecordFilesTask.java b/eladmin-system/src/main/java/me/zhengjie/task/MergeRecordFilesTask.java new file mode 100644 index 0000000..4d765f3 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/task/MergeRecordFilesTask.java @@ -0,0 +1,103 @@ +package me.zhengjie.task; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import me.zhengjie.modules.taskrecord.service.TaskRecordService; +import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto; +import me.zhengjie.utils.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; + +@Component +@Slf4j +public class MergeRecordFilesTask { + + + /** + * 两个集合操作 - 交集 - (listA ∩ ListB) + */ + private static final String INTERSECTION_TAG = "intersection"; + + /** + * 两个集合操作 - 交集 -(listA ∪ listB) + */ + private static final String UNION_TAG = "union"; + + /** + * 两个集合操作 - 差集 -(listA - listB) + */ + private static final String SUBTRACTION_TAG = "subtraction"; + + @Autowired + private TaskRecordService taskRecordService; + + /** + * 临时集合 + */ + private List tempList; + + /** + * 任务处理入口,主要用于时间记录 + * + */ + @Async(value = "MergeRecordFilesTask") + public void doRunTask(List ids, String tag){ + Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); + log.info("====== [ task start running, task name is {} ] ======", "MergeRecordFilesTask"); + runTask(ids, tag); + Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); + log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "MergeRecordFilesTask", (endMilliSecond - satrtMilliSecond)); + } + + private void runTask(List ids, String tag) { + // 准备需要进行操作的集合内容 + ids.forEach( + id->{ + TaskRecordDto dto = taskRecordService.findById(id); + if (dto != null){ + String localFilePath = dto.getLocalFilePath(); + if (StringUtils.isNotBlank(localFilePath)){ + try { + List list = Files.readAllLines(Paths.get(localFilePath)); + if (!CollectionUtils.isEmpty(list)){ + // 执行相关的集合间的操作 + mergeEachCollect(list, tag); + } + } catch (IOException e) { + log.error("========= [Read file Error , filePath is : {} ] =========", localFilePath, e); + } + } + } + } + ); + } + + private void mergeEachCollect(List list, String tag) { + + if (CollectionUtils.isEmpty(tempList)){ + tempList.addAll(list); + } + + // 交 + if (StringUtils.equalsIgnoreCase(INTERSECTION_TAG, tag)){ + + } + // 并 + if (StringUtils.equalsIgnoreCase(UNION_TAG, tag)){ + + } + // 差 + if (StringUtils.equalsIgnoreCase(SUBTRACTION_TAG, tag)){ + + } + } +} diff --git a/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java b/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java index fe1d466..fb47a8f 100644 --- a/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/task/ProduceLocalFileTask.java @@ -1,14 +1,37 @@ package me.zhengjie.task; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.system.OsInfo; +import cn.hutool.system.SystemUtil; +import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; +import me.zhengjie.modules.abmessage.domain.AbMessage; +import me.zhengjie.modules.abmessage.service.AbMessageService; +import me.zhengjie.modules.form.domain.FormMessage; +import me.zhengjie.modules.form.service.FormMessageService; import me.zhengjie.modules.taskrecord.domain.TaskRecord; -import org.apache.poi.ss.formula.functions.T; +import me.zhengjie.modules.taskrecord.service.TaskRecordService; +import me.zhengjie.utils.FileUtil; +import me.zhengjie.utils.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.domain.Example; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.stream.Collectors; /** * 生成本地文件的异步执行任务类 @@ -17,6 +40,65 @@ import java.util.List; @Slf4j public class ProduceLocalFileTask { + /** + * 默认的生成文件前缀 + */ + private static final String PRE_FILE = "out_task_"; + + /** + * 字段分隔符 (目前用于随机文件名中,...) [后续使用范围进行自行补充] + */ + private static final String SEPARATOR = "_"; + + /** + * AB任务标识 + */ + private static final String TAG_AB = "AB"; + + /** + * FORM任务标识 + */ + private static final String TAG_FORM = "FORM"; + + /** + * 随机字符串的长度 (目前用于随机文件名中,...) [后续使用范围进行自行补充] + */ + private static final int RANDOM_STRING_LENGTH = 6; + + /** + * 分页查询起始页 - 默认从0开始计算 + */ + private static final int START_PAGE = 0; + + /** + * 分页查询 - 单次2000条 + */ + private static final int SIZE_PAGE = 2000; + + + /** + * Windows环境下的路径信息 + */ + @Value(value = "file.windows.path") + private String windowsFilePath; + /** + * Windows环境下的路径信息 + */ + @Value(value = "file.linux.path") + private String linuxFilePath; + /** + * Windows环境下的路径信息 + */ + @Value(value = "file.mac.path") + private String macFilePath; + + @Autowired + private TaskRecordService taskRecordService; + @Autowired + private AbMessageService abMessageService; + @Autowired + private FormMessageService formMessageService; + /** * 任务处理入口,主要用于时间记录 * @@ -24,15 +106,174 @@ public class ProduceLocalFileTask { * @param task */ @Async(value = "ProduceLocalFileTaskExecutor") - public void doRunTask(TaskRecord task){ + public void doRunTask(TaskRecord task, String tag){ Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); log.info("====== [ task start running, task name is {} ] ======", "ProduceLocalFileTask"); - runTask(task); + runTask(task, tag); Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "ProduceLocalFileTask", (endMilliSecond - satrtMilliSecond)); } - private void runTask(TaskRecord task) { + private void runTask(TaskRecord task, String tag) { + // 转换需要的查询参数 + String params = task.getParams(); + TaskRecord taskRecord = new TaskRecord(); + // 组装分页查询参数 + Pageable pageable = PageRequest.of(START_PAGE, SIZE_PAGE); + // 设置不同的查询条件,去查AB表和Form表的数据 + // 构建写入文件的路径 + buildFileWritePath(taskRecord); + if (StringUtils.equalsIgnoreCase(TAG_AB, tag)){ + // Json参数解析 + AbMessage abMessage = new AbMessage(); + if (StringUtils.isNotBlank(params)){ + abMessage = JSON.parseObject(params, AbMessage.class); + } + // 构建分页查询,并写入到文件中 + buildABQueryToFile(taskRecord, pageable, abMessage); + } + if (StringUtils.equalsIgnoreCase(TAG_FORM, tag)){ + // Json参数解析 + FormMessage formMessage = new FormMessage(); + if (StringUtils.isNotBlank(params)){ + formMessage = JSON.parseObject(params, FormMessage.class); + } + // 构建分页查询,并写入到文件中 + buildFromQueryToFile(taskRecord, pageable, formMessage); + } + } + + private void buildFromQueryToFile(TaskRecord taskRecord, Pageable pageable, FormMessage formMessage) { + if (formMessage == null){ + log.error("============[Parse formMessage fail, please check formMessage info ]============"); + return; + } + Example example = Example.of(formMessage); + while(true){ + // AB表查询 + Slice slice = formMessageService.queryAll(example, pageable); + if (slice == null){ + log.error("============[Query FormMessage fail, please check example info : {} ]============", example.getProbe().toString()); + return; + } + List abMessageList = slice.getContent(); + if (!CollectionUtils.isEmpty(abMessageList)){ + // 只获取需要的字段 + List collect = abMessageList.stream() + .map(FormMessage::getUid) + .collect(Collectors.toList()); + // 设置总数 + taskRecord.setTotal((long) slice.getSize()); + // 解析并写入文件 + writeToFile(taskRecord, collect); + } + if (!slice.hasNext()){ + break; + } + pageable = slice.nextPageable(); + } + } + + private void buildABQueryToFile(TaskRecord taskRecord, Pageable pageable, AbMessage abMessage) { + if (abMessage == null){ + log.error("============[Parse abMessage fail, please check abMessage info ]============"); + return; + } + Example example = Example.of(abMessage); + while(true){ + // AB表查询 + Slice slice = abMessageService.queryAll(example, pageable); + if (slice == null){ + log.error("============[Query abMessage fail, please check example info : {} ]============", example.getProbe().toString()); + return; + } + List abMessageList = slice.getContent(); + if (!CollectionUtils.isEmpty(abMessageList)){ + // 只获取需要的字段 + List collect = abMessageList.stream() + .map(AbMessage::getUid) + .collect(Collectors.toList()); + // 设置总数 + taskRecord.setTotal((long) slice.getSize()); + // 解析并写入文件 + writeToFile(taskRecord, collect); + } + if (!slice.hasNext()){ + break; + } + pageable = slice.nextPageable(); + } + } + + /** + * 解析文件然后入库 + * + * @param taskRecord 需要构建的任务包体 + */ + private void buildFileWritePath(TaskRecord taskRecord){ + // 获取环境配置信息 + OsInfo osInfo = SystemUtil.getOsInfo(); + String fullPath; + if (osInfo.isWindows()){ + // 构建存储文件 + fullPath = windowsFilePath + buildOnlyFileNameRule(); + }else if (osInfo.isLinux()){ + // 构建存储文件 + fullPath = linuxFilePath + buildOnlyFileNameRule(); + }else if (osInfo.isMac()){ + // 构建存储文件 + // 构建存储文件 + fullPath = macFilePath + buildOnlyFileNameRule(); + }else { + return; + } + taskRecord.setLocalFilePath(fullPath); } + + /** + * 待发送记录写入文件中,并记录课包信息 + * + * @param taskRecord 课包记录信息 + * @param collect 一个分页下的数据 + */ + private void writeToFile(TaskRecord taskRecord, List collect){ + String fullPath = taskRecord.getLocalFilePath(); + if (StringUtils.isBlank(fullPath)){ + return; + } + // 构建存储文件 + try { + if (!FileUtil.exist(fullPath)){ + Files.write(Paths.get(fullPath), collect, StandardOpenOption.CREATE_NEW); + TaskRecord update = taskRecordService.update(taskRecord); + if (update == null){ + log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString()); + } + }else{ + Files.write(Paths.get(fullPath), collect, StandardOpenOption.APPEND); + TaskRecord update = taskRecordService.update(taskRecord); + if (update == null){ + log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString()); + } + } + } catch (IOException e) { + log.error("write prepare send file fail, please check param, fullPath is {}, {}", fullPath, e ); + } + } + + /** + * 构建随机文件名规则 + * + * @return 构建好的随机文件名 + */ + private String buildOnlyFileNameRule(){ + String fileName = PRE_FILE + + RandomUtil.randomString(RANDOM_STRING_LENGTH) + + SEPARATOR + + Instant.now().getEpochSecond(); + return fileName; + } + + } diff --git a/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java b/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java index c34a729..f2fc4d6 100644 --- a/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/task/SendRecordTask.java @@ -1,11 +1,18 @@ package me.zhengjie.task; +import cn.hutool.http.HttpResponse; +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import me.zhengjie.modules.taskrecord.domain.TaskRecord; import me.zhengjie.modules.taskrecord.service.TaskRecordService; import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto; +import me.zhengjie.utils.HttpUtil; import me.zhengjie.utils.StringUtils; import org.apache.poi.ss.formula.functions.T; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @@ -16,19 +23,31 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * 发送记录异步执行任务类 */ @Component @Slf4j -public class SendRecordTask { +public class SendRecordTask { /** * 成功构建标识 */ private static final int BUILD_STATUS = 1; + /** + * 每次发送条数限制 + */ + private static final int SEND_LIMIT = 500; + + /** + * 下游发送地址 + */ + @Value(value = "req.db.url") + private String url; + @Autowired private TaskRecordService taskRecordService; @@ -39,35 +58,74 @@ public class SendRecordTask { * @return */ @Async(value = "SendRecordTaskExecutor") - public void doRunTask(List list){ + public void doRunTask(Integer id, Long limit){ Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); log.info("====== [ task start running, task name is {} ] ======", "SendRecordTask"); - runTask(list); + runTask(id, limit); Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "SendRecordTask", (endMilliSecond - satrtMilliSecond)); } - private void runTask(List list) { + private void runTask(Integer id, Long limit) { // 准备所有需要发送任务 - List readySendTasks = new ArrayList<>(); - list.forEach( - id->{ - TaskRecordDto taskRecordDto = taskRecordService.findById(id); - if (BUILD_STATUS == taskRecordDto.getIsBuild() && StringUtils.isNotBlank(taskRecordDto.getLocalFilePath())){ - readySendTasks.add(taskRecordDto); - } + TaskRecordDto taskRecordDto = taskRecordService.findById(id); + if (BUILD_STATUS == taskRecordDto.getIsBuild() && StringUtils.isNotBlank(taskRecordDto.getLocalFilePath())){ + String localFilePath = taskRecordDto.getLocalFilePath(); + if (StringUtils.isBlank(localFilePath)){ + log.error("============== [ localFilePath is Empty, please check taskRecordDto : {} ]==============", taskRecordDto.toString()); + return; + } + try { + // tips : 发送前需要调用前端接口进行发送数目合法性校验 + List lines = Files.readAllLines(Paths.get(localFilePath)); + // 限制发送次数 为limit以下 + Long sendTotal = taskRecordDto.getSendTotal(); + if (sendTotal <= 0 ){ + return; } - ); - // 遍历所有的待发送路径进行发送 - readySendTasks.forEach( - task->{ - String localFilePath = task.getLocalFilePath(); - try { - // 发送前已经通过前端接口进行发送数目合法性校验 - List lines = Files.readAllLines(Paths.get(localFilePath)); + List collect = lines.stream() + .skip(sendTotal) + .limit(limit - sendTotal) + .collect(Collectors.toList()); + + // 分批进行发送 + batchSend(collect); + // 对发送后的记录进行更新 + TaskRecord taskRecord = new TaskRecord(); + BeanUtils.copyProperties(taskRecordDto, taskRecord); + taskRecord.setSendTotal(taskRecordDto.getSendTotal() + limit); + taskRecordService.update(taskRecord); + } catch (IOException e) { + log.error("Read ready send file fail, send class is {}, ready send file path is {}", this.getClass().getName(), localFilePath, e); + } + } + } - } catch (IOException e) { - log.error("read ready send file fail, send class is {}, ready send file path is {}", this.getClass().getName(), localFilePath, e); + private void batchSend(List collect) { + List> partition = Lists.partition(collect, SEND_LIMIT); + partition.forEach( + list->{ + // TODO: 2020/9/10 0010 调用推送地址进行推送 + String jsonStr = JSON.toJSONString(list); + if (StringUtils.isNotBlank(jsonStr)){ + int count = 1; + // 失败重发请求3次 + while (count <= 3){ + // 调用HTTP请求发送数据 + HttpResponse httpResponse = HttpUtil.sendPostReq(url, jsonStr); + // todo 这个判别发送正常的码要进行判别 ok + if (httpResponse.isOk() && httpResponse.body().contains("ok")){ + log.info("========== [DB request success, response is {} ] ==========", httpResponse.body()); + break; + }else{ + count ++; + log.error("========== [DB request fail, response is {} ] ==========", httpResponse.body()); + } + } + if (count > 3) { + log.error("========== [DB update send status fail, url is {} ] ==========", url); + return; + } } } ); diff --git a/eladmin-system/src/test/java/me/zhengjie/SpringJPATest.java b/eladmin-system/src/test/java/me/zhengjie/SpringJPATest.java new file mode 100644 index 0000000..2d5769f --- /dev/null +++ b/eladmin-system/src/test/java/me/zhengjie/SpringJPATest.java @@ -0,0 +1,69 @@ +package me.zhengjie; + +import lombok.extern.slf4j.Slf4j; +import me.zhengjie.modules.student.domain.Student; +import me.zhengjie.modules.student.repository.StudentRepository; +import me.zhengjie.modules.student.service.StudentService; +import me.zhengjie.utils.FileUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.domain.*; +import org.springframework.test.context.junit4.SpringRunner; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * JPA相关的使用 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@Slf4j +public class SpringJPATest { + + @Autowired + private StudentService studentService; + + @Autowired + private StudentRepository studentRepository; + + @Test + public void testQueryBySlice() throws Exception{ + + Pageable pageable = PageRequest.of(0,2); + Student student = new Student(); + student.setAge(11); + Example example = Example.of(student); + while(true){ + Slice slice = studentRepository.findAll(example, pageable); + List studentList = slice.getContent(); + List strList = new ArrayList<>(); + AtomicInteger count = new AtomicInteger(1); + int i = 1; + studentList.forEach( + each->{ + strList.add(each.toString() + (count.getAndIncrement()) +"_"+ Thread.currentThread().getName()); +// strList.add(each.toString() + (count.getAndIncrement()) +"_"+ Thread.currentThread().getName()); + } + ); + System.out.println("1111"); + log.info("111111"); + String path = "C:\\Users\\Administrator\\Desktop\\2_1"; + if (!FileUtil.exist(path)){ + Files.write(Paths.get(path), strList, StandardOpenOption.CREATE_NEW); + }else{ + Files.write(Paths.get(path), strList, StandardOpenOption.APPEND); + } + if (!slice.hasNext()){ + break; + } + pageable = slice.nextPageable(); + } + } +} diff --git a/eladmin-system/src/test/java/me/zhengjie/TempTest.java b/eladmin-system/src/test/java/me/zhengjie/TempTest.java new file mode 100644 index 0000000..03f6691 --- /dev/null +++ b/eladmin-system/src/test/java/me/zhengjie/TempTest.java @@ -0,0 +1,52 @@ +package me.zhengjie; + +import com.google.common.collect.Lists; +import me.zhengjie.utils.FileUtil; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.List; + +/** + * 本地工具方法进行测试的类 + */ +public class TempTest { + + + @Test + public void testListWrite() throws IOException { + List list = Arrays.asList("111","222"); + + Files.write(Paths.get("C:\\Users\\Administrator\\Desktop\\2"), list); + } + + @Test + public void testWriteOne() { + List list = Arrays.asList("111","222"); + list.forEach( + one->{ + try { + Files.write(Paths.get("C:\\Users\\Administrator\\Desktop\\2"), one.getBytes(), StandardOpenOption.APPEND); + } catch (IOException e) { + e.printStackTrace(); + } + } + ); + } + + @Test + public void testListWrite2() throws IOException { + List list = Arrays.asList("111","222"); + String path = "C:\\Users\\Administrator\\Desktop\\2"; + if (!FileUtil.exist(path)){ + Files.write(Paths.get(path), list, StandardOpenOption.CREATE_NEW); + }else{ + Files.write(Paths.get(path), list, StandardOpenOption.APPEND); + } + } + +}