上传平台修复bug最终版2

master
土豆兄弟 4 years ago
parent 58bb8e2315
commit 1f869da9bd

@ -158,7 +158,11 @@ public class BuildRecordController {
@AnonymousAccess // fixme 需要测试完成后进行去除和使用上面的权限注解 @AnonymousAccess // fixme 需要测试完成后进行去除和使用上面的权限注解
public ResponseEntity<Object> sendTask(@RequestBody BuildRecordSendVO buildRecordSendVO){ public ResponseEntity<Object> sendTask(@RequestBody BuildRecordSendVO buildRecordSendVO){
// 参数校验 // 参数校验
if (buildRecordSendVO == null || buildRecordSendVO.getLimit() == null || buildRecordSendVO.getResource().getTaskBuildId() == null || StringUtils.isBlank(buildRecordSendVO.getSendName())){ if (buildRecordSendVO == null
|| buildRecordSendVO.getResource().getTaskBuildId() == null
|| (buildRecordSendVO.getResource().getId() == null && buildRecordSendVO.getRemoteRecord().getId() == null)
|| StringUtils.isBlank(buildRecordSendVO.getSendName())
){
return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK);
} }
SendBigDataDTO sendBigDataDTO = new SendBigDataDTO(); SendBigDataDTO sendBigDataDTO = new SendBigDataDTO();

@ -14,6 +14,7 @@ import me.zhengjie.modules.buildrecord.service.dto.BuildRecordDto;
import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO; import me.zhengjie.modules.buildrecord.task.dto.SendBigDataDTO;
import me.zhengjie.modules.remoterec.domain.RemoteRecord; import me.zhengjie.modules.remoterec.domain.RemoteRecord;
import me.zhengjie.modules.remoterec.service.RemoteRecordService; import me.zhengjie.modules.remoterec.service.RemoteRecordService;
import me.zhengjie.modules.remoterec.service.dto.RemoteRecordDto;
import me.zhengjie.modules.tag.domain.Tag; import me.zhengjie.modules.tag.domain.Tag;
import me.zhengjie.modules.tag.service.TagService; import me.zhengjie.modules.tag.service.TagService;
import me.zhengjie.modules.tag.service.dto.TagDto; import me.zhengjie.modules.tag.service.dto.TagDto;
@ -101,23 +102,18 @@ public class SendBigDataTask {
private void runTask(BuildRecord resource, RemoteRecord remoteRecord, SendBigDataDTO sendBigDataDTO) { private void runTask(BuildRecord resource, RemoteRecord remoteRecord, SendBigDataDTO sendBigDataDTO) {
// 根据发送任务的Id来读取发送号码表
Integer id = resource.getId();
log.info("id: {} ", id);
if (id == null || id <= 0 ){
log.error("====== { ERROR , resource body is {} } =======" , resource.toString());
return;
}
BuildRecordDto buildRecordDto = buildRecordService.findById(id);
// 通过是否含有 BuildRecordDto-params 来区分来源 // 通过是否含有 BuildRecordDto-params 来区分来源
List<Tag> collect; List<Tag> collect;
List<String> remoteCollect; List<String> remoteCollect;
// 获取需要进行去重的任务路径 // 获取需要进行去重的任务路径
String resultFilePath = getPreFilterPath(sendBigDataDTO); String resultFilePath = getPreFilterPath(sendBigDataDTO);
if (StringUtils.isNotBlank(buildRecordDto.getParams())){ if (resource.getId() != null){
// 根据发送任务的Id来读取发送号码表
Integer id = resource.getId();
BuildRecordDto buildRecordDto = buildRecordService.findById(id);
// 本地表库中的记录的源 // 本地表库中的记录的源
collect = getTagsFromLocalDBRec(resource, sendBigDataDTO, buildRecordDto); collect = getTagsFromLocalDBRec(resource, sendBigDataDTO, buildRecordDto);
// 进行去重逻辑 // 进行去重逻辑
@ -126,13 +122,16 @@ public class SendBigDataTask {
// 对需要发送的字段进行发送 // 对需要发送的字段进行发送
AtomicLong atomicLong = batchSend(collect, sendBigDataDTO, buildRecordDto); AtomicLong atomicLong = batchSend(collect, sendBigDataDTO, buildRecordDto);
finishSendThenUpdateRec(resource, buildRecordDto, atomicLong); finishSendThenUpdateRec(resource, buildRecordDto, atomicLong);
}else { }
if (remoteRecord.getId() != null){
RemoteRecordDto remoteRecordDto = remoteRecordService.findById(remoteRecord.getId());
// 远程文件存储形式的源 // 远程文件存储形式的源
remoteCollect = getRemoteRecFromLocal(buildRecordDto); remoteCollect = getRemoteRecFromLocal(remoteRecordDto);
// 进行去重逻辑 // 进行去重逻辑
remoteCollect = getNotDuplicateRemoteContent(remoteCollect, resultFilePath); remoteCollect = getNotDuplicateRemoteContent(remoteCollect, resultFilePath);
// 批量发送信息并且更新记录 // 批量发送信息并且更新记录
AtomicLong atomicLong = batchSendRemote(remoteCollect, sendBigDataDTO, buildRecordDto); AtomicLong atomicLong = batchSendRemote(remoteCollect, sendBigDataDTO, resource);
finishSendThenUpdateRemoteRec(remoteRecord, atomicLong); finishSendThenUpdateRemoteRec(remoteRecord, atomicLong);
} }
@ -174,7 +173,7 @@ public class SendBigDataTask {
RemoteRecord remote = new RemoteRecord(); RemoteRecord remote = new RemoteRecord();
BeanUtil.copyProperties(remoteRecord, remote); BeanUtil.copyProperties(remoteRecord, remote);
remote.setTag(SUCCESS_TAG); remote.setTag(SUCCESS_TAG);
remote.setWeight(10); remote.setWeight(10); // 成功发送后把成功发送的记录权重值设置为 10
remote.setSuccessSendCount(atomicLong.get()); remote.setSuccessSendCount(atomicLong.get());
remoteRecordService.update(remote); remoteRecordService.update(remote);
} }
@ -239,8 +238,8 @@ public class SendBigDataTask {
return resultFilePath; return resultFilePath;
} }
private List<String> getRemoteRecFromLocal(BuildRecordDto buildRecordDto) { private List<String> getRemoteRecFromLocal(RemoteRecordDto remoteRecordDto) {
String localFilePath = buildRecordDto.getLocalFilePath(); String localFilePath = remoteRecordDto.getLocalSavePath();
if (StringUtils.isBlank(localFilePath)){ if (StringUtils.isBlank(localFilePath)){
log.error("SendBigDataTask|getRemoteRecFromLocal , localFilePath is null!"); log.error("SendBigDataTask|getRemoteRecFromLocal , localFilePath is null!");
} }
@ -322,14 +321,14 @@ public class SendBigDataTask {
return successCount; return successCount;
} }
private AtomicLong batchSendRemote(List<String> collect, SendBigDataDTO sendRecordDTO, BuildRecordDto buildRecordDto) { private AtomicLong batchSendRemote(List<String> collect, SendBigDataDTO sendRecordDTO, BuildRecord resource) {
AtomicLong successCount = new AtomicLong(0L); AtomicLong successCount = new AtomicLong(0L);
List<List<String>> partition = Lists.partition(collect, SEND_LIMIT); List<List<String>> partition = Lists.partition(collect, SEND_LIMIT);
partition.forEach( partition.forEach(
list->{ list->{
// 调用推送地址进行推送 // 调用推送地址进行推送
PushDBJsonContent pushDBJsonContent = new PushDBJsonContent(); PushDBJsonContent pushDBJsonContent = new PushDBJsonContent();
pushDBJsonContent.setActId(buildRecordDto.getTaskBuildId()); pushDBJsonContent.setActId(resource.getTaskBuildId());
pushDBJsonContent.setActName(sendRecordDTO.getSendName()); pushDBJsonContent.setActName(sendRecordDTO.getSendName());
// 加入每一个号码对应接口字段 // 加入每一个号码对应接口字段
List<PushDBJsonContent.Client> clientList = new ArrayList<>(); List<PushDBJsonContent.Client> clientList = new ArrayList<>();

@ -13,6 +13,11 @@ public class RemoteRecConst {
*/ */
public static final Integer FAIL_TAG = 1; public static final Integer FAIL_TAG = 1;
/**
*
*/
public static final Integer READY_TAG = 2;
/** /**
* _ * _
*/ */

@ -1,7 +1,11 @@
package me.zhengjie.modules.remoterec.task; package me.zhengjie.modules.remoterec.task;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Timestamp; import java.sql.Timestamp;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
import cn.hutool.extra.ssh.JschUtil; import cn.hutool.extra.ssh.JschUtil;
import cn.hutool.extra.ssh.Sftp; import cn.hutool.extra.ssh.Sftp;
@ -29,6 +33,7 @@ import java.text.SimpleDateFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.Date; import java.util.Date;
import java.util.List;
import static me.zhengjie.modules.remoterec.consts.RemoteRecConst.*; import static me.zhengjie.modules.remoterec.consts.RemoteRecConst.*;
import static me.zhengjie.modules.remoterec.consts.SysConst.*; import static me.zhengjie.modules.remoterec.consts.SysConst.*;
@ -84,16 +89,27 @@ public class DownloadSFTPFileTask {
// 然后调用sftp进行取回 // 然后调用sftp进行取回
String sftpFilePath = resources.getSftpFilePath(); String sftpFilePath = resources.getSftpFilePath();
String remoteFileToKLocal = "";
if (StringUtils.isNotBlank(sftpFilePath)){ if (StringUtils.isNotBlank(sftpFilePath)){
sftpToDownloadFile(sftpFilePath, resources); remoteFileToKLocal = sftpToDownloadFile(sftpFilePath, resources);
} }
// 取回成功/失败,更新记录状态 // 取回成功/失败,更新记录状态
RemoteRecord remoteRecord = new RemoteRecord(); RemoteRecord remoteRecord = new RemoteRecord();
BeanUtil.copyProperties(remoteRecordDto, remoteRecord); BeanUtil.copyProperties(remoteRecordDto, remoteRecord);
remoteRecord.setOperation(resources.getOperation()); remoteRecord.setOperation(resources.getOperation());
remoteRecord.setFileTransSuccessCount(resources.getFileTransSuccessCount()); try {
remoteRecord.setTag(0); List<String> stringList = Files.readAllLines(Paths.get(remoteFileToKLocal));
if(CollectionUtil.isNotEmpty(stringList)){
remoteRecord.setFileTransSuccessCount(stringList.size());
stringList.clear();
}
} catch (IOException e) {
log.error("===== [DownloadSFTPFileTask|runTask, save trans to local file fail, path is {} ] =====", remoteFileToKLocal);
e.printStackTrace();
}
remoteRecord.setTag(READY_TAG);
remoteRecord.setLocalSavePath(resources.getLocalSavePath()); remoteRecord.setLocalSavePath(resources.getLocalSavePath());
remoteRecord.setWeight(0); remoteRecord.setWeight(0);
remoteRecord.setUploadRemoteTaskName(resources.getUploadRemoteTaskName()); remoteRecord.setUploadRemoteTaskName(resources.getUploadRemoteTaskName());
@ -103,7 +119,7 @@ public class DownloadSFTPFileTask {
// fixme 更新失败补充推送费用 // fixme 更新失败补充推送费用
} }
private void sftpToDownloadFile(String sftpFilePath, RemoteRecord resources) { private String sftpToDownloadFile(String sftpFilePath, RemoteRecord resources) {
// 准备sftp配置 // 准备sftp配置
Session session = JschUtil.getSession(REMOTE_TRANS_HOST, REMOTE_TRANS_PORT, REMOTE_TRANS_SSH_USER, REMOTE_TRANS_SSH_PW); Session session = JschUtil.getSession(REMOTE_TRANS_HOST, REMOTE_TRANS_PORT, REMOTE_TRANS_SSH_USER, REMOTE_TRANS_SSH_PW);
@ -114,7 +130,7 @@ public class DownloadSFTPFileTask {
if (StringUtils.isBlank(fullFileName)){ if (StringUtils.isBlank(fullFileName)){
log.error(" ====== [DownloadSFTPFileTask|sftpToDownloadFile, sftp download path is null.] ======"); log.error(" ====== [DownloadSFTPFileTask|sftpToDownloadFile, sftp download path is null.] ======");
resources.setTag(FAIL_TAG); resources.setTag(FAIL_TAG);
return; return "";
} }
String localSavePath = buildFileWritePath(fullFileName); String localSavePath = buildFileWritePath(fullFileName);
resources.setLocalSavePath(localSavePath); resources.setLocalSavePath(localSavePath);
@ -123,6 +139,8 @@ public class DownloadSFTPFileTask {
sftp.download(sftpFilePath, FileUtil.file(localSavePath)); sftp.download(sftpFilePath, FileUtil.file(localSavePath));
sftp.close(); sftp.close();
return localSavePath;
} }
/** /**

Loading…
Cancel
Save