diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/rest/BuildRecordController.java b/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/rest/BuildRecordController.java index 798ea59..e421994 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/rest/BuildRecordController.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/rest/BuildRecordController.java @@ -158,7 +158,11 @@ public class BuildRecordController { @AnonymousAccess // fixme 需要测试完成后进行去除和使用上面的权限注解 public ResponseEntity sendTask(@RequestBody BuildRecordSendVO buildRecordSendVO){ // 参数校验 - if (buildRecordSendVO == null || buildRecordSendVO.getLimit() == null || buildRecordSendVO.getResource().getTaskBuildId() == null || StringUtils.isBlank(buildRecordSendVO.getSendName())){ + if (buildRecordSendVO == null + || buildRecordSendVO.getResource().getTaskBuildId() == null + || (buildRecordSendVO.getResource().getId() == null && buildRecordSendVO.getRemoteRecord().getId() == null) + || StringUtils.isBlank(buildRecordSendVO.getSendName()) + ){ return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); } SendBigDataDTO sendBigDataDTO = new SendBigDataDTO(); diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java b/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java index 6222316..79dc384 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/buildrecord/task/SendBigDataTask.java @@ -14,6 +14,7 @@ import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto; import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO; import me.zhengjie.modules.remoterec.domain.RemoteRecord; import me.zhengjie.modules.remoterec.service.RemoteRecordService; +import me.zhengjie.modules.remoterec.service.dto.RemoteRecordDto; import me.zhengjie.modules.tag.domain.Tag; import me.zhengjie.modules.tag.service.TagService; import me.zhengjie.modules.tag.service.dto.TagDto; @@ -101,23 +102,18 @@ public class SendBigDataTask { private void runTask(BuildRecord resource, RemoteRecord remoteRecord, SendBigDataDTO sendBigDataDTO) { - // 根据发送任务的Id来读取发送号码表 - Integer id = resource.getId(); - log.info("id: {} ", id); - if (id == null || id <= 0 ){ - log.error("====== { ERROR , resource body is {} } =======" , resource.toString()); - return; - } - BuildRecordDto buildRecordDto = buildRecordService.findById(id); // 通过是否含有 BuildRecordDto-params 来区分来源 List collect; - List remoteCollect; // 获取需要进行去重的任务路径 String resultFilePath = getPreFilterPath(sendBigDataDTO); - if (StringUtils.isNotBlank(buildRecordDto.getParams())){ + if (resource.getId() != null){ + // 根据发送任务的Id来读取发送号码表 + Integer id = resource.getId(); + + BuildRecordDto buildRecordDto = buildRecordService.findById(id); // 本地表库中的记录的源 collect = getTagsFromLocalDBRec(resource, sendBigDataDTO, buildRecordDto); // 进行去重逻辑 @@ -126,13 +122,16 @@ public class SendBigDataTask { // 对需要发送的字段进行发送 AtomicLong atomicLong = batchSend(collect, sendBigDataDTO, buildRecordDto); finishSendThenUpdateRec(resource, buildRecordDto, atomicLong); - }else { + } + + if (remoteRecord.getId() != null){ + RemoteRecordDto remoteRecordDto = remoteRecordService.findById(remoteRecord.getId()); // 远程文件存储形式的源 - remoteCollect = getRemoteRecFromLocal(buildRecordDto); + remoteCollect = getRemoteRecFromLocal(remoteRecordDto); // 进行去重逻辑 remoteCollect = getNotDuplicateRemoteContent(remoteCollect, resultFilePath); // 批量发送信息并且更新记录 - AtomicLong atomicLong = batchSendRemote(remoteCollect, sendBigDataDTO, buildRecordDto); + AtomicLong atomicLong = batchSendRemote(remoteCollect, sendBigDataDTO, resource); finishSendThenUpdateRemoteRec(remoteRecord, atomicLong); } @@ -174,7 +173,7 @@ public class SendBigDataTask { RemoteRecord remote = new RemoteRecord(); BeanUtil.copyProperties(remoteRecord, remote); remote.setTag(SUCCESS_TAG); - remote.setWeight(10); + remote.setWeight(10); // 成功发送后把成功发送的记录权重值设置为 10 remote.setSuccessSendCount(atomicLong.get()); remoteRecordService.update(remote); } @@ -239,8 +238,8 @@ public class SendBigDataTask { return resultFilePath; } - private List getRemoteRecFromLocal(BuildRecordDto buildRecordDto) { - String localFilePath = buildRecordDto.getLocalFilePath(); + private List getRemoteRecFromLocal(RemoteRecordDto remoteRecordDto) { + String localFilePath = remoteRecordDto.getLocalSavePath(); if (StringUtils.isBlank(localFilePath)){ log.error("SendBigDataTask|getRemoteRecFromLocal , localFilePath is null!"); } @@ -322,14 +321,14 @@ public class SendBigDataTask { return successCount; } - private AtomicLong batchSendRemote(List collect, SendBigDataDTO sendRecordDTO, BuildRecordDto buildRecordDto) { + private AtomicLong batchSendRemote(List collect, SendBigDataDTO sendRecordDTO, BuildRecord resource) { AtomicLong successCount = new AtomicLong(0L); List> partition = Lists.partition(collect, SEND_LIMIT); partition.forEach( list->{ // 调用推送地址进行推送 PushDBJsonContent pushDBJsonContent = new PushDBJsonContent(); - pushDBJsonContent.setActId(buildRecordDto.getTaskBuildId()); + pushDBJsonContent.setActId(resource.getTaskBuildId()); pushDBJsonContent.setActName(sendRecordDTO.getSendName()); // 加入每一个号码对应接口字段 List clientList = new ArrayList<>(); diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/RemoteRecConst.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/RemoteRecConst.java index 1181ddd..3cb9fd6 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/RemoteRecConst.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/consts/RemoteRecConst.java @@ -13,6 +13,11 @@ public class RemoteRecConst { */ public static final Integer FAIL_TAG = 1; + /** + * 等待发送 + */ + public static final Integer READY_TAG = 2; + /** * 文件名的分隔符 _ */ diff --git a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/task/DownloadSFTPFileTask.java b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/task/DownloadSFTPFileTask.java index 05cd757..5dbd1ef 100644 --- a/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/task/DownloadSFTPFileTask.java +++ b/eladmin-system/src/main/java/me/zhengjie/modules/remoterec/task/DownloadSFTPFileTask.java @@ -1,7 +1,11 @@ package me.zhengjie.modules.remoterec.task; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.sql.Timestamp; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.RandomUtil; import cn.hutool.extra.ssh.JschUtil; import cn.hutool.extra.ssh.Sftp; @@ -29,6 +33,7 @@ import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Date; +import java.util.List; import static me.zhengjie.modules.remoterec.consts.RemoteRecConst.*; import static me.zhengjie.modules.remoterec.consts.SysConst.*; @@ -84,16 +89,27 @@ public class DownloadSFTPFileTask { // 然后调用sftp进行取回 String sftpFilePath = resources.getSftpFilePath(); + String remoteFileToKLocal = ""; if (StringUtils.isNotBlank(sftpFilePath)){ - sftpToDownloadFile(sftpFilePath, resources); + remoteFileToKLocal = sftpToDownloadFile(sftpFilePath, resources); } // 取回成功/失败,更新记录状态 RemoteRecord remoteRecord = new RemoteRecord(); BeanUtil.copyProperties(remoteRecordDto, remoteRecord); remoteRecord.setOperation(resources.getOperation()); - remoteRecord.setFileTransSuccessCount(resources.getFileTransSuccessCount()); - remoteRecord.setTag(0); + try { + List stringList = Files.readAllLines(Paths.get(remoteFileToKLocal)); + if(CollectionUtil.isNotEmpty(stringList)){ + remoteRecord.setFileTransSuccessCount(stringList.size()); + stringList.clear(); + } + } catch (IOException e) { + log.error("===== [DownloadSFTPFileTask|runTask, save trans to local file fail, path is {} ] =====", remoteFileToKLocal); + e.printStackTrace(); + } + + remoteRecord.setTag(READY_TAG); remoteRecord.setLocalSavePath(resources.getLocalSavePath()); remoteRecord.setWeight(0); remoteRecord.setUploadRemoteTaskName(resources.getUploadRemoteTaskName()); @@ -103,7 +119,7 @@ public class DownloadSFTPFileTask { // fixme 更新失败补充推送费用 } - private void sftpToDownloadFile(String sftpFilePath, RemoteRecord resources) { + private String sftpToDownloadFile(String sftpFilePath, RemoteRecord resources) { // 准备sftp配置 Session session = JschUtil.getSession(REMOTE_TRANS_HOST, REMOTE_TRANS_PORT, REMOTE_TRANS_SSH_USER, REMOTE_TRANS_SSH_PW); @@ -114,7 +130,7 @@ public class DownloadSFTPFileTask { if (StringUtils.isBlank(fullFileName)){ log.error(" ====== [DownloadSFTPFileTask|sftpToDownloadFile, sftp download path is null.] ======"); resources.setTag(FAIL_TAG); - return; + return ""; } String localSavePath = buildFileWritePath(fullFileName); resources.setLocalSavePath(localSavePath); @@ -123,6 +139,8 @@ public class DownloadSFTPFileTask { sftp.download(sftpFilePath, FileUtil.file(localSavePath)); sftp.close(); + return localSavePath; + } /**