diff --git a/eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java b/eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java index 5142873..e07df73 100644 --- a/eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java +++ b/eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java @@ -83,6 +83,22 @@ public class ThreadPoolConfig { @Value(value = "${SendBigData.task.thread_pool.ThreadNamePrefix}") private String sendBigDataThreadNamePrefix = "SendBigDataTaskExecutor-"; // fixme 这个属性暂时没用起来 + + /** + * remoteTask + */ + @Value(value = "${remoteRec.task.thread_pool.corePoolSize}") + private int remoteTaskDataCorePoolSize = 2; + @Value(value = "${remoteRec.task.thread_pool.maxPoolSize}") + private int remoteTaskMaxPoolSize = 16; + @Value(value = "${remoteRec.task.thread_pool.queueCapacity}") + private int remoteTaskQueueCapacity = 3; + @Value(value = "${remoteRec.task.thread_pool.ThreadNamePrefix}") + private String remoteTaskThreadNamePrefix = "RemoteTaskExecutor-"; // fixme 这个属性暂时没用起来 + + + + /** * 生成本地课包任务 - 异步线程池的配置类 * @@ -167,4 +183,18 @@ public class ThreadPoolConfig { return threadPoolExecutor; } + @Bean(value = "RemoteTaskExecutor") + public Executor remoteTaskExecutor(){ + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + remoteTaskDataCorePoolSize, + remoteTaskMaxPoolSize, + 3, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(remoteTaskQueueCapacity), + Executors.defaultThreadFactory(), + new ThreadPoolExecutor.DiscardOldestPolicy() + ); + return threadPoolExecutor; + } + } diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java b/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java index 9e1efe7..b379b83 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java @@ -20,6 +20,7 @@ import me.zhengjie.modules.taskrecord.service.TaskRecordService; import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto; import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria; import me.zhengjie.utils.ConvertUtil; +import me.zhengjie.utils.FileUtil; import me.zhengjie.utils.HttpUtil; import me.zhengjie.utils.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -31,23 +32,23 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.io.IOException; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static me.zhengjie.modules.common.consts.SendBigDataConst.NON_FINISH_SEND_STATATUS; + @Component @Scope("prototype") @Slf4j public class SendBigDataTask { - /** - * 未发送的状态 - */ - private static final int NON_FINISH_SEND_STATATUS = 0; /** * 每次发送条数限制 @@ -94,6 +95,7 @@ public class SendBigDataTask { } private void runTask(BuildRecord resource, SendBigDataDTO sendBigDataDTO) { + // 根据发送任务的Id来读取发送号码表 Integer id = resource.getId(); log.info("id: {} ", id); @@ -102,56 +104,38 @@ public class SendBigDataTask { return; } BuildRecordDto buildRecordDto = buildRecordService.findById(id); - buildRecordDto.setTaskBuildId(resource.getTaskBuildId()); - // 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体 - TagQueryCriteria tagQueryCriteria = new TagQueryCriteria(); - tagQueryCriteria.setTaskId(Long.valueOf(id)); - tagQueryCriteria.setPushStatus(NON_FINISH_SEND_STATATUS); - // 加入分页防止查不动的情况 这里可以分页进行查询 - 目前不进行优化 - 为全查 - Pageable pageable = PageRequest.of(0, sendBigDataDTO.getLimit().intValue()); - List backContent = tagService.queryAllBySlice(tagQueryCriteria, pageable); - // 遍历查询等待发送的列表 - List collect = backContent.stream() - .distinct() - .collect(Collectors.toList()); - // 进行去重 - String resultFilePath = null; - String onlyName = sendBigDataDTO.getOnlyName(); - if (StringUtils.isNotBlank(onlyName)){ - TaskRecordQueryCriteria queryCriteria = new TaskRecordQueryCriteria(); - queryCriteria.setTaskName(onlyName); - List taskRecordDtos = taskRecordService.queryAll(queryCriteria); - if (CollectionUtil.isNotEmpty(taskRecordDtos) && taskRecordDtos.size() == 1){ - resultFilePath = taskRecordDtos.get(0).getLocalFilePath(); - } - } + // 通过是否含有 BuildRecordDto-params 来区分来源 + List collect; - /** - * - * modify by x , 2020-12-15 - * - * 优化详细见 - * - * @see me.zhengjie.StreamTest#testFilter() - */ - if (StringUtils.isNotBlank(resultFilePath)){ - Set fileLines = new HashSet<>(50_0000); - try { - fileLines = new HashSet<>(Files.readAllLines(Paths.get(resultFilePath))); - } catch (IOException e) { - log.error("================== {read file error , please check is , file path is : {} } ================================", resultFilePath, e); - } - // 过滤的集合 - Set filterCollect = collect.stream().parallel().map(Tag::getUid).filter(fileLines::contains).collect(Collectors.toSet()); - if (CollectionUtil.isNotEmpty(filterCollect)){ - log.info("================== [Filter collect is ready, collect size is {} ] ================================",filterCollect.size()); - collect = collect.stream().filter(one -> !filterCollect.contains(one.getUid())).collect(Collectors.toList()); - } + List remoteCollect; + + // 获取需要进行去重的任务路径 + String resultFilePath = getPreFilterPath(sendBigDataDTO); + + if (StringUtils.isNotBlank(buildRecordDto.getParams())){ + // 本地表库中的记录的源 + collect = getTagsFromLocalDBRec(resource, sendBigDataDTO, buildRecordDto); + // 进行去重逻辑 + collect = getNotDuplicateContent(collect, resultFilePath); + // 批量发送并且更新记录内容 + // 对需要发送的字段进行发送 + AtomicLong atomicLong = batchSend(collect, sendBigDataDTO, buildRecordDto); + finishSendThenUpdateRec(resource, buildRecordDto, atomicLong); + }else { + // 远程文件存储形式的源 + remoteCollect = getRemoteRecFromLocal(buildRecordDto); + // 进行去重逻辑 + remoteCollect = getNotDuplicateRemoteContent(remoteCollect, resultFilePath); + // 批量发送信息并且更新记录 + AtomicLong atomicLong = batchSendRemote(remoteCollect, sendBigDataDTO, buildRecordDto); + finishSendThenUpdateRec(resource, buildRecordDto, atomicLong); } - // 乱序 + + // 乱序 (乱序的逻辑写在SQL语句中) // Collections.shuffle(collect); - // 对需要发送的字段进行发送 - AtomicLong atomicLong = batchSend(collect, sendBigDataDTO, buildRecordDto); + } + + private void finishSendThenUpdateRec(BuildRecord resource, BuildRecordDto buildRecordDto, AtomicLong atomicLong) { // 对发送后的状态进行更新 buildRecordDto.setIsSend(FINISH_SEND_TAG); BuildRecord buildRecord = new BuildRecord(); @@ -160,7 +144,7 @@ public class SendBigDataTask { // 之前要进行校验 Long dbCount = buildRecordDto.getSendTotal(); // 这个数字会不准确和最开始的记录可能有冲突,这边加一个统计校验 - Long sendCountFromDB = tagService.countSendSum(Long.valueOf(id), FINISH_SEND_TAG); + Long sendCountFromDB = tagService.countSendSum(Long.valueOf(resource.getId()), FINISH_SEND_TAG); if (dbCount == null){ dbCount = 0L; } @@ -180,6 +164,87 @@ public class SendBigDataTask { } } + private List getNotDuplicateRemoteContent(List remoteCollect, String resultFilePath) { + if (StringUtils.isNotBlank(resultFilePath)){ + Set fileLines = new HashSet<>(50_0000); + try { + fileLines = new HashSet<>(Files.readAllLines(Paths.get(resultFilePath))); + } catch (IOException e) { + log.error("================== {read file error , please check is , file path is : {} } ================================", resultFilePath, e); + } + // 过滤的集合 + Set filterCollect = remoteCollect.stream().parallel().filter(fileLines::contains).collect(Collectors.toSet()); + if (CollectionUtil.isNotEmpty(filterCollect)){ + log.info("================== [Filter collect is ready, collect size is {} ] ================================",filterCollect.size()); + remoteCollect = remoteCollect.stream().filter(one -> !filterCollect.contains(one)).collect(Collectors.toList()); + } + } + return remoteCollect; + } + + /** + * 进行去重 + */ + private List getNotDuplicateContent(List collect, String resultFilePath) { + /** + * modify by x , 2020-12-15 + * 优化详细见 + * @see me.zhengjie.StreamTest#testFilter() + */ + if (StringUtils.isNotBlank(resultFilePath)){ + Set fileLines = new HashSet<>(50_0000); + try { + fileLines = new HashSet<>(Files.readAllLines(Paths.get(resultFilePath))); + } catch (IOException e) { + log.error("================== {read file error , please check is , file path is : {} } ================================", resultFilePath, e); + } + // 过滤的集合 + Set filterCollect = collect.stream().parallel().map(Tag::getUid).filter(fileLines::contains).collect(Collectors.toSet()); + if (CollectionUtil.isNotEmpty(filterCollect)){ + log.info("================== [Filter collect is ready, collect size is {} ] ================================",filterCollect.size()); + collect = collect.stream().filter(one -> !filterCollect.contains(one.getUid())).collect(Collectors.toList()); + } + } + return collect; + } + + private String getPreFilterPath(SendBigDataDTO sendBigDataDTO) { + String resultFilePath = null; + String onlyName = sendBigDataDTO.getOnlyName(); + if (StringUtils.isNotBlank(onlyName)){ + TaskRecordQueryCriteria queryCriteria = new TaskRecordQueryCriteria(); + queryCriteria.setTaskName(onlyName); + List taskRecordDtos = taskRecordService.queryAll(queryCriteria); + if (CollectionUtil.isNotEmpty(taskRecordDtos) && taskRecordDtos.size() == 1){ + resultFilePath = taskRecordDtos.get(0).getLocalFilePath(); + } + } + return resultFilePath; + } + + private List getRemoteRecFromLocal(BuildRecordDto buildRecordDto) { + String localFilePath = buildRecordDto.getLocalFilePath(); + if (StringUtils.isBlank(localFilePath)){ + log.error("SendBigDataTask|getRemoteRecFromLocal , localFilePath is null!"); + } + return FileUtil.readLines(localFilePath, Charset.defaultCharset()); + } + + private List getTagsFromLocalDBRec(BuildRecord resource, SendBigDataDTO sendBigDataDTO, BuildRecordDto buildRecordDto) { + buildRecordDto.setTaskBuildId(resource.getTaskBuildId()); + // 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体 + TagQueryCriteria tagQueryCriteria = new TagQueryCriteria(); + tagQueryCriteria.setTaskId(Long.valueOf(resource.getId())); + tagQueryCriteria.setPushStatus(NON_FINISH_SEND_STATATUS); + // 加入分页防止查不动的情况 这里可以分页进行查询 - 目前不进行优化 - 为全查 + Pageable pageable = PageRequest.of(0, sendBigDataDTO.getLimit().intValue()); + List backContent = tagService.queryAllBySlice(tagQueryCriteria, pageable); + // 遍历查询等待发送的列表 + return backContent.stream() + .distinct() + .collect(Collectors.toList()); + } + private AtomicLong batchSend(List collect, SendBigDataDTO sendRecordDTO, BuildRecordDto buildRecordDto) { AtomicLong successCount = new AtomicLong(0L); List> partition = Lists.partition(collect, SEND_LIMIT); @@ -202,7 +267,7 @@ public class SendBigDataTask { // 进行Json解析 String jsonStr = JSON.toJSONString(pushDBJsonContent); if (StringUtils.isNotBlank(jsonStr)){ - log.info("============ [ Pre send Json is : {} ] ============", jsonStr); + log.info("============ [ batchSend Pre send Json is : {} ] ============", jsonStr); int count = 1; // 失败重发请求3次 String address =""; @@ -240,6 +305,59 @@ public class SendBigDataTask { return successCount; } + private AtomicLong batchSendRemote(List collect, SendBigDataDTO sendRecordDTO, BuildRecordDto buildRecordDto) { + AtomicLong successCount = new AtomicLong(0L); + List> partition = Lists.partition(collect, SEND_LIMIT); + partition.forEach( + list->{ + // 调用推送地址进行推送 + PushDBJsonContent pushDBJsonContent = new PushDBJsonContent(); + pushDBJsonContent.setActId(buildRecordDto.getTaskBuildId()); + pushDBJsonContent.setActName(sendRecordDTO.getSendName()); + // 加入每一个号码对应接口字段 + List clientList = new ArrayList<>(); + list.forEach( + each->{ + PushDBJsonContent.Client client = new PushDBJsonContent.Client(); + client.setCellphone(each); + clientList.add(client); + } + ); + pushDBJsonContent.setClientList(clientList); + // 进行Json解析 + String jsonStr = JSON.toJSONString(pushDBJsonContent); + if (StringUtils.isNotBlank(jsonStr)){ + log.info("============ [ batchSendRemote Pre send Json is : {} ] ============", jsonStr); + int count = 1; + // 失败重发请求3次 + String address =""; + while (count <= 3){ + // 对发送请求地址进行准备 + String addressTag = sendRecordDTO.getAddressTag(); + if (StringUtils.isNotBlank(addressTag)){ + address = preSendReqAddress(addressTag); + log.info("========== [DB request address is {} ] =========", address); + } + // 调用HTTP请求发送数据 + HttpResponse httpResponse = HttpUtil.sendPostReq(address, jsonStr); + if (httpResponse.isOk() && httpResponse.body().contains("true")){ + log.info("========== [DB request success, response is {} ] ==========", httpResponse.body()); + successCount.addAndGet(list.size()); + break; + }else{ + count ++; + log.error("========== [DB request fail, response is {} ] ==========", httpResponse.body()); + } + } + if (count > 3) { + log.error("========== [DB update send status fail, url is {} ] ==========", address); + } + } + } + ); + return successCount; + } + private String preSendReqAddress(String tag) { StringBuilder builder = new StringBuilder(); // 处理环境选择问题, 只接受单个 a, b, c 中的任意一个输入,如果不输入就进行随机返回 diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/common/consts/SendBigDataConst.java b/eladmin-system/src/main/java/me/zhengjie/modules/common/consts/SendBigDataConst.java new file mode 100644 index 0000000..6cdf7d3 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/common/consts/SendBigDataConst.java @@ -0,0 +1,9 @@ +package me.zhengjie.modules.common.consts; + +public class SendBigDataConst { + + /** + * 未发送的状态 + */ + public static final int NON_FINISH_SEND_STATATUS = 0; +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/common/design/BigDataTaskTemplate.java b/eladmin-system/src/main/java/me/zhengjie/modules/common/design/BigDataTaskTemplate.java new file mode 100644 index 0000000..5f28944 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/common/design/BigDataTaskTemplate.java @@ -0,0 +1,52 @@ +package me.zhengjie.modules.common.design; + +import me.zhengjie.modules.buildrecord.domain.BuildRecord; +import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto; +import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO; +import me.zhengjie.modules.tag.domain.Tag; +import me.zhengjie.modules.tag.service.TagService; +import me.zhengjie.modules.tag.service.dto.TagQueryCriteria; +import org.apache.poi.ss.formula.functions.T; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.stream.Collectors; + +import static me.zhengjie.modules.common.consts.SendBigDataConst.NON_FINISH_SEND_STATATUS; + +/** + * 本地库大数据的执行方式 + */ +@Component +public class BigDataTaskTemplate extends SendTaskAbstractTemplate{ + + @Autowired + private TagService tagService; + @Override + protected List preOperationAbstractMethod(BuildRecordDto buildRecordDto, BuildRecord resource, SendBigDataDTO sendBigDataDTO) { + + buildRecordDto.setTaskBuildId(resource.getTaskBuildId()); + // 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体 + TagQueryCriteria tagQueryCriteria = new TagQueryCriteria(); + tagQueryCriteria.setTaskId(Long.valueOf(resource.getId())); + tagQueryCriteria.setPushStatus(NON_FINISH_SEND_STATATUS); + // 加入分页防止查不动的情况 这里可以分页进行查询 - 目前不进行优化 - 为全查 + Pageable pageable = PageRequest.of(0, sendBigDataDTO.getLimit().intValue()); + List backContent = tagService.queryAllBySlice(tagQueryCriteria, pageable); + // 遍历查询等待发送的列表 + List collect = backContent.stream() + .distinct() + .collect(Collectors.toList()); + +// return collect; + return null; + } + + @Override + protected void preSendContentAbstractMethod() { + + } +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/common/design/RemoteDataTaskTemplate.java b/eladmin-system/src/main/java/me/zhengjie/modules/common/design/RemoteDataTaskTemplate.java new file mode 100644 index 0000000..a451ac8 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/common/design/RemoteDataTaskTemplate.java @@ -0,0 +1,26 @@ +package me.zhengjie.modules.common.design; + +import me.zhengjie.modules.buildrecord.domain.BuildRecord; +import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto; +import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO; +import org.apache.poi.ss.formula.functions.T; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 远程上传任务的执行模板 + * + */ +@Component +public class RemoteDataTaskTemplate extends SendTaskAbstractTemplate{ + @Override + protected List preOperationAbstractMethod(BuildRecordDto buildRecordDto, BuildRecord resource, SendBigDataDTO sendBigDataDTO) { + return null; + } + + @Override + protected void preSendContentAbstractMethod() { + + } +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/common/design/SendTaskAbstractTemplate.java b/eladmin-system/src/main/java/me/zhengjie/modules/common/design/SendTaskAbstractTemplate.java new file mode 100644 index 0000000..86ae640 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/common/design/SendTaskAbstractTemplate.java @@ -0,0 +1,49 @@ +package me.zhengjie.modules.common.design; + + +import me.zhengjie.modules.buildrecord.domain.BuildRecord; +import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto; +import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO; +import org.apache.poi.ss.formula.functions.T; + +import java.util.List; + +/** + * 发送任务的模板 - 因为要对多个任务进行通配 + * + * fixme 抽象成模板模式进行更改 + */ +public abstract class SendTaskAbstractTemplate { + + + /** + * 定义模板方法 + * ps: 定义 SpecificMethod abstractMethod + */ + public void SendTaskTemplateMethod() + { + + // 准备要进行发送的内容 + preSendContentAbstractMethod(); + + // 进行发送前的准备额外操作 +// preOperationAbstractMethod(); + + // 进行分批发送 + batchSendSpecificMethod(); + + // 后续处理 + + } + + protected abstract List preOperationAbstractMethod(BuildRecordDto buildRecordDto, BuildRecord buildRecord, SendBigDataDTO sendBigDataDTO); + + protected abstract void preSendContentAbstractMethod(); + + + protected void batchSendSpecificMethod(){ + + } + + +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/common/handler/DeleteFileScheduler.java b/eladmin-system/src/main/java/me/zhengjie/modules/common/handler/DeleteFileScheduler.java index 8150073..5bc1ac4 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/common/handler/DeleteFileScheduler.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/common/handler/DeleteFileScheduler.java @@ -7,6 +7,7 @@ import me.zhengjie.modules.tmpfilerecord.service.dto.TempFileRecordQueryCriteria import me.zhengjie.utils.FileUtil; import me.zhengjie.utils.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -37,9 +38,9 @@ public class DeleteFileScheduler { private TempFileRecordService tempFileRecordService; /** - * 在每天11点进行任务删除 + * 在每天23点进行任务删除 */ -// @Scheduled(cron = "0 0 23 ? * *") + @Scheduled(cron = "0 0 23 ? * *") public void deleteOrUpdateFilePerDayAtElevenClock() { log.info("======== [start run delete/update file scheduler task.] ========"); // 先进行分类 diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/mailtask/rest/MailTaskController.java b/eladmin-system/src/main/java/me/zhengjie/modules/mailtask/rest/MailTaskController.java index 27402b3..251bb8a 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/mailtask/rest/MailTaskController.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/mailtask/rest/MailTaskController.java @@ -86,10 +86,10 @@ public class MailTaskController { @AnonymousAccess // fixme 需要测试完成后进行去除和使用上面的权限注解 public void downLoadOnceFile(HttpServletResponse response, HttpServletRequest request, @RequestParam(value = "rand")String rand){ // 校验参数,进行下载文件 + mailTaskService.downloadFilesByRandCode(rand, response, request); /* if (StringUtils.isBlank(rand)){ return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); }*/ - mailTaskService.downloadFilesByRandCode(rand, response, request); /* if (tag){ return new ResponseEntity<>(CommonResponse.createBySuccessByCode(ResponseCode.SUCCESS_ONCE_LINK_MSG), HttpStatus.OK); }else { diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/RemoteRecConst.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/RemoteRecConst.java new file mode 100644 index 0000000..c14fd79 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/RemoteRecConst.java @@ -0,0 +1,37 @@ +package me.zhengjie.modules.remoterec.consts; + +public class RemoteRecConst { + + + /** + * 成功执行的标识 + */ + public static final Integer SUCCESS_TAG = 0; + + /** + * 失败执行的标识 + */ + public static final Integer FAIL_TAG = 1; + + /** + * 文件名的分隔符 _ + */ + public static final String FILE_NAME_SPLIT = "_"; + + + /** + * ext 文件类型分隔符 - 通配 + */ + public static final String FILE_SPLIT = "."; + + /** + * 文件路径的分隔 + */ + public static final String FILE_PATH_SPLIT = ","; + + + /** + * 成功建立任务标识 + */ + public static final Integer SUCCESS_BUILD_TAG = 1; +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/SysConst.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/SysConst.java new file mode 100644 index 0000000..535b6c0 --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/SysConst.java @@ -0,0 +1,19 @@ +package me.zhengjie.modules.remoterec.consts; + +/** + * 系统相关的配置 + */ +public class SysConst { + + // 远程服务器的相关配置 + public static final String REMOTE_TRANS_HOST = "47.110.11.213"; + + public static final Integer REMOTE_TRANS_PORT = 22; + + public static final String REMOTE_TRANS_SSH_USER = "root"; + + public static final String REMOTE_TRANS_SSH_PW = "yuyou@ECS2020"; + + // 测试内容临时定义 + public static final String TEST_USER_NAME = "测试用户"; +} diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/domain/RemoteRecord.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/domain/RemoteRecord.java index c125f2f..566f836 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/domain/RemoteRecord.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/domain/RemoteRecord.java @@ -42,17 +42,14 @@ public class RemoteRecord implements Serializable { private Integer id; @Column(name = "upload_time",nullable = false) - @NotNull @ApiModelProperty(value = "上传时间") private Timestamp uploadTime; @Column(name = "operation",nullable = false) - @NotBlank @ApiModelProperty(value = "上传人/公司") private String operation; @Column(name = "file_trans_success_count",nullable = false) - @NotNull @ApiModelProperty(value = "成功保存数量") private Integer fileTransSuccessCount; @@ -60,16 +57,22 @@ public class RemoteRecord implements Serializable { @ApiModelProperty(value = "发送状态标识") private Integer tag; - @Column(name = "localSavePath",nullable = false) - @NotBlank + @Column(name = "local_save_path",nullable = false) @ApiModelProperty(value = "文件上传保存路径") - private String localsavepath; + private String localSavePath; @Column(name = "weight",nullable = false) - @NotNull @ApiModelProperty(value = "权重值") private Integer weight; + @Column(name = "upload_remote_task_name", nullable = false) + @ApiModelProperty(value = "远程上传的任务名称") + private String uploadRemoteTaskName; + + @Transient // 不需要保存到数据库的字段 + @ApiModelProperty(value = "SFTP传输的文件名称") + private String sftpFilePath; + public void copy(RemoteRecord source){ BeanUtil.copyProperties(source,this, CopyOptions.create().setIgnoreNullValue(true)); } diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/rest/RemoteRecordController.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/rest/RemoteRecordController.java index 8b78968..e8cb33e 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/rest/RemoteRecordController.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/rest/RemoteRecordController.java @@ -15,10 +15,15 @@ */ package me.zhengjie.modules.remoterec.rest; +import me.zhengjie.annotation.AnonymousAccess; import me.zhengjie.annotation.Log; +import me.zhengjie.common.http.CommonResponse; +import me.zhengjie.common.http.ResponseCode; import me.zhengjie.modules.remoterec.domain.RemoteRecord; import me.zhengjie.modules.remoterec.service.RemoteRecordService; import me.zhengjie.modules.remoterec.service.dto.RemoteRecordQueryCriteria; +import me.zhengjie.modules.remoterec.task.DownloadSFTPFileTask; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Pageable; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; @@ -28,6 +33,7 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import io.swagger.annotations.*; import java.io.IOException; +import java.util.Objects; import javax.servlet.http.HttpServletResponse; /** @@ -43,6 +49,9 @@ public class RemoteRecordController { private final RemoteRecordService remoteRecordService; + @Autowired + private DownloadSFTPFileTask downloadSFTPFileTask; + @Log("导出数据") @ApiOperation("导出数据") @GetMapping(value = "/download") @@ -67,6 +76,28 @@ public class RemoteRecordController { return new ResponseEntity<>(remoteRecordService.create(resources),HttpStatus.CREATED); } + /** + * 远程上传接口的记录更新 + sftp进行文件下载到本地 + * + * @param resources 发送的参数 + * @return 响应结果定义 + */ + @PostMapping("/remote/rec") + @Log("新增远程上传记录并下载远程传来的文件") + @ApiOperation("新增远程上传记录并下载远程传来的文件") +// @PreAuthorize("@el.check('remoteRecord:add')") + @AnonymousAccess // fixme 需要测试完成后进行去除和使用上面的权限注解 + public ResponseEntity createRemoteRecAndDownload(@Validated @RequestBody RemoteRecord resources){ + + if (Objects.isNull(resources)){ + return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); + } + // 调用异步任务进行sftp文件下载 + downloadSFTPFileTask.doRunTask(resources); + // 更新相关的记录 + return new ResponseEntity<>(remoteRecordService.create(resources),HttpStatus.CREATED); + } + @PutMapping @Log("修改远程上传记录") @ApiOperation("修改远程上传记录") diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/dto/RemoteRecordDto.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/dto/RemoteRecordDto.java index 927063e..b1c4473 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/dto/RemoteRecordDto.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/dto/RemoteRecordDto.java @@ -44,8 +44,11 @@ public class RemoteRecordDto implements Serializable { private Integer tag; /** 文件上传保存路径 */ - private String localsavepath; + private String localSavePath; /** 权重值 */ private Integer weight; + + /** 元洪城呢个上传任务的名称*/ + private String uploadRemoteTaskName; } \ No newline at end of file diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/impl/RemoteRecordServiceImpl.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/impl/RemoteRecordServiceImpl.java index 1e70f8e..cb1a433 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/impl/RemoteRecordServiceImpl.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/service/impl/RemoteRecordServiceImpl.java @@ -100,7 +100,7 @@ public class RemoteRecordServiceImpl implements RemoteRecordService { map.put("上传人/公司", remoteRecord.getOperation()); map.put("成功保存数量", remoteRecord.getFileTransSuccessCount()); map.put("发送状态标识", remoteRecord.getTag()); - map.put("文件上传保存路径", remoteRecord.getLocalsavepath()); + map.put("文件上传保存路径", remoteRecord.getLocalSavePath()); map.put("权重值", remoteRecord.getWeight()); list.add(map); } diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/task/DownloadSFTPFileTask.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/task/DownloadSFTPFileTask.java new file mode 100644 index 0000000..f51b08a --- /dev/null +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/task/DownloadSFTPFileTask.java @@ -0,0 +1,170 @@ +package me.zhengjie.modules.remoterec.task; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.extra.ssh.JschUtil; +import cn.hutool.extra.ssh.Sftp; +import cn.hutool.system.OsInfo; +import cn.hutool.system.SystemUtil; +import com.jcraft.jsch.Session; +import lombok.extern.slf4j.Slf4j; +import me.zhengjie.modules.buildrecord.domain.BuildRecord; +import me.zhengjie.modules.buildrecord.service.BuildRecordService; +import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto; +import me.zhengjie.modules.remoterec.domain.RemoteRecord; +import me.zhengjie.modules.remoterec.service.RemoteRecordService; +import me.zhengjie.modules.remoterec.service.dto.RemoteRecordDto; +import me.zhengjie.utils.ConvertUtil; +import me.zhengjie.utils.FileUtil; +import me.zhengjie.utils.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Scope; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Date; + +import static me.zhengjie.modules.remoterec.consts.RemoteRecConst.*; +import static me.zhengjie.modules.remoterec.consts.SysConst.*; + +@Component +@Scope("prototype") +@Slf4j +public class DownloadSFTPFileTask { + + + @Autowired + private RemoteRecordService remoteRecordService; + + @Autowired + private BuildRecordService buildRecordService; + + /** + * 远程服务器地址 + */ + @Value("${remote.link.address}") + private String remoteFileServerAddress; + + /** + * 远程上传临时存放文件地址 - linux环境 + */ + @Value("${remote.link.file-base-path-linux}") + private String remoteLinkFileBasePathLinux; + + /** + * 远程上传临时存放文件地址 - linux环境 + */ + @Value("${remote.link.file-base-path-windows}") + private String remoteLinkFileBasePathWindows; + + /** + * 远程上传临时存放文件地址 - linux环境 + */ + @Value("${remote.link.file-base-path-mac}") + private String remoteLinkFileBasePathMac; + + + + @Async(value = "RemoteTaskExecutor") + public void doRunTask(RemoteRecord resources) { + Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); + log.info("====== [ DownloadSFTPFileTask task start running, task name is {} ] ======", "SendBigDataTask"); + runTask(resources); + Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); + log.info("====== [ DownloadSFTPFileTask task start end, task name is {},cost milliSecond is {} ] ======", "DownloadSFTPFileTask", ConvertUtil.secondToTime(endMilliSecond - satrtMilliSecond)); + } + + private void runTask(RemoteRecord resources) { + // 更新一条记录,然后状态为待取回状态 + RemoteRecordDto remoteRecordDto = remoteRecordService.create(resources); + // 然后调用sftp进行取回 + String sftpFilePath = resources.getSftpFilePath(); + if (StringUtils.isNotBlank(sftpFilePath)){ + sftpToDownloadFile(sftpFilePath, resources); + } + + // 取回成功/失败,更新记录状态 + RemoteRecord remoteRecord = new RemoteRecord(); + BeanUtil.copyProperties(remoteRecordDto, remoteRecord); + resources.setTag(SUCCESS_TAG); + remoteRecordService.update(remoteRecord); + + // 更新待发送记录表相关的信息 + BuildRecord buildRecord = new BuildRecord(); + + buildRecord.setTaskName(resources.getUploadRemoteTaskName()); + buildRecord.setTimePeriod(7); // 默认保存时间为7天 + buildRecord.setIsBuild(SUCCESS_BUILD_TAG); + buildRecord.setOprationName(resources.getOperation()); + buildRecord.setLocalFilePath(resources.getLocalSavePath()); + buildRecord.setTotal(Long.valueOf(resources.getFileTransSuccessCount())); + + // 先对任务表进行记录 + BuildRecordDto buildRecordDto = buildRecordService.create(buildRecord); + if (buildRecordDto == null) { + log.error("============ [create build rec is fail, please check it. ] ============"); + } + // fixme 更新失败补充推送费用 + } + + private void sftpToDownloadFile(String sftpFilePath, RemoteRecord resources) { + + // 准备sftp配置 + Session session = JschUtil.getSession(REMOTE_TRANS_HOST, REMOTE_TRANS_PORT, REMOTE_TRANS_SSH_USER, REMOTE_TRANS_SSH_PW); + Sftp sftp = JschUtil.createSftp(session); + + // 构建输出 + String fullFileName = FileUtil.getName(sftpFilePath); + if (StringUtils.isBlank(fullFileName)){ + log.error(" ====== [DownloadSFTPFileTask|sftpToDownloadFile, sftp download path is null.] ======"); + resources.setTag(FAIL_TAG); + return; + } + String localSavePath = buildFileWritePath(fullFileName); + resources.setLocalSavePath(localSavePath); + // SFTP 进行下载 + sftp.download(sftpFilePath, FileUtil.file(localSavePath)); + sftp.close(); + } + + /** + * 构建文件上传保存路径 + */ + private String buildFileWritePath(String baseStr) { + // 获取环境配置信息 + OsInfo osInfo = SystemUtil.getOsInfo(); + // 定义的时间格式 + String timeFormate = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); + + String dirPath; + // 生成一个随机文件夹目录,方便整理和打包 + + String filePath = RandomUtil.randomString(6) + FILE_NAME_SPLIT + baseStr; + + if (osInfo.isWindows()) { + dirPath = remoteLinkFileBasePathWindows + timeFormate + File.separator; + FileUtil.mkdir(new File(dirPath)); + // 构建存储文件 + return dirPath + filePath; + } else if (osInfo.isLinux()) { + dirPath = remoteLinkFileBasePathLinux + timeFormate + File.separator; + FileUtil.mkdir(new File(dirPath)); + // 构建存储文件 + return dirPath + filePath; + } else if (osInfo.isMac()) { + dirPath = remoteLinkFileBasePathMac + timeFormate + File.separator; + FileUtil.mkdir(new File(dirPath)); + // 构建存储文件 + return dirPath + filePath; + } else { + return ""; + } + } + + +} diff --git a/eladmin-system/src/main/resources/config/application-dev.yml b/eladmin-system/src/main/resources/config/application-dev.yml index d441816..cd96f5c 100644 --- a/eladmin-system/src/main/resources/config/application-dev.yml +++ b/eladmin-system/src/main/resources/config/application-dev.yml @@ -208,6 +208,13 @@ SendBigData: maxPoolSize: 16 queueCapacity: 3 ThreadNamePrefix: 'SendBigDataTaskExecutor-' +remoteRec: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'RemoteRecTaskExecutor-' # 增加日志相关的配置 logging: level: diff --git a/eladmin-system/src/main/resources/config/application-prod.yml b/eladmin-system/src/main/resources/config/application-prod.yml index 51d325c..c09f415 100644 --- a/eladmin-system/src/main/resources/config/application-prod.yml +++ b/eladmin-system/src/main/resources/config/application-prod.yml @@ -166,6 +166,13 @@ SendBigData: maxPoolSize: 16 queueCapacity: 3 ThreadNamePrefix: 'SendBigDataTaskExecutor-' +remoteRec: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'RemoteRecTaskExecutor-' # 增加日志相关的配置 logging: level: diff --git a/eladmin-system/src/main/resources/config/application-stg.yml b/eladmin-system/src/main/resources/config/application-stg.yml index 0dbfca5..9732ad3 100644 --- a/eladmin-system/src/main/resources/config/application-stg.yml +++ b/eladmin-system/src/main/resources/config/application-stg.yml @@ -208,6 +208,13 @@ SendBigData: maxPoolSize: 16 queueCapacity: 3 ThreadNamePrefix: 'SendBigDataTaskExecutor-' +remoteRec: + task: + thread_pool: + corePoolSize: 2 + maxPoolSize: 16 + queueCapacity: 3 + ThreadNamePrefix: 'RemoteRecTaskExecutor-' # 增加日志相关的配置 logging: level: diff --git a/eladmin-system/src/main/resources/config/application.yml b/eladmin-system/src/main/resources/config/application.yml index 6a02dab..2817347 100644 --- a/eladmin-system/src/main/resources/config/application.yml +++ b/eladmin-system/src/main/resources/config/application.yml @@ -1,11 +1,11 @@ server: - port: 8000 + port: 8001 spring: freemarker: check-template-location: false profiles: - active: test + active: dev jackson: time-zone: GMT+8 data: diff --git a/eladmin-system/src/main/resources/logback.xml b/eladmin-system/src/main/resources/logback.xml index dcf1306..30ccc85 100644 --- a/eladmin-system/src/main/resources/logback.xml +++ b/eladmin-system/src/main/resources/logback.xml @@ -77,7 +77,7 @@ - - + + \ No newline at end of file diff --git a/eladmin-system/src/test/java/me/zhengjie/TestScheduler.java b/eladmin-system/src/test/java/me/zhengjie/TestScheduler.java index c9263a3..6b73e86 100644 --- a/eladmin-system/src/test/java/me/zhengjie/TestScheduler.java +++ b/eladmin-system/src/test/java/me/zhengjie/TestScheduler.java @@ -11,7 +11,9 @@ public class TestScheduler extends MailTest{ @Test - public void testDel(){ + public void testDel() throws InterruptedException { deleteFileScheduler.deleteOrUpdateFilePerDayAtElevenClock(); + + Thread.sleep(100000000); } }