@ -1,25 +1,96 @@
package me.zhengjie.modules.upload.task ;
import cn.hutool.core.bean.BeanUtil ;
import cn.hutool.core.codec.Base64 ;
import cn.hutool.core.collection.CollectionUtil ;
import cn.hutool.core.util.RandomUtil ;
import cn.hutool.crypto.SecureUtil ;
import cn.hutool.extra.ssh.JschUtil ;
import cn.hutool.extra.ssh.Sftp ;
import cn.hutool.http.HttpRequest ;
import cn.hutool.http.HttpResponse ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSONArray ;
import com.google.common.collect.Lists ;
import com.jcraft.jsch.Session ;
import com.mchange.v2.beans.BeansUtils ;
import lombok.extern.slf4j.Slf4j ;
import me.zhengjie.modules.upload.domain.UploadFile ;
import me.zhengjie.modules.upload.service.UploadFileService ;
import me.zhengjie.modules.upload.service.dto.UploadFileDto ;
import me.zhengjie.modules.upload.task.model.ResponseEncryptJsonContent ;
import me.zhengjie.modules.upload.task.model.SendEncryptJsonContent ;
import me.zhengjie.modules.upload.task.model.SendRemoteUpdateJsonContent ;
import me.zhengjie.utils.ConvertUtil ;
import me.zhengjie.utils.DateUtil ;
import me.zhengjie.utils.FileUtil ;
import me.zhengjie.utils.StringUtils ;
import org.springframework.beans.BeanUtils ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.context.annotation.Scope ;
import org.springframework.scheduling.annotation.Async ;
import org.springframework.stereotype.Component ;
import java.beans.Beans ;
import java.io.IOException ;
import java.nio.file.Files ;
import java.nio.file.Paths ;
import java.nio.file.StandardOpenOption ;
import java.time.LocalDateTime ;
import java.time.ZoneOffset ;
import java.util.Arrays ;
import java.util.List ;
import static me.zhengjie.modules.upload.consts.SysConst.* ;
import static me.zhengjie.modules.upload.consts.UploadFileConst.* ;
@Component
@Scope ( "prototype" )
@Slf4j
public class SaveToFileTask {
/ * *
* 调 用 加 密 接 口 的 加 密 内 容 数 量 限 制
* /
private final static int SEND_ENCRYPT_LIMIT = 200 ;
/ * *
* 时 间 格 式
* /
private final static String FORMATE_TIMESTAMP = "yyyyMMddHHmmss" ;
/ * *
* 分 割 标 识 - 逗 号
* /
private static final String SPLIT_TAG = "," ;
// FIXME: 2021/1/5 0005 传输相关的配置 - 不要写在代码中,想办法进行加密
/ * *
* 加 密 请 求 需 要 的 各 种 配 置 信 息
* /
@Value ( value = "${inter.address}" )
private String encryptAddress ;
@Value ( value = "inter.appid" )
private String encryptAppId ;
@Value ( value = "inter.tk" )
private String encryptTK ;
@Autowired
private UploadFileService uploadFileService ;
@Async ( value = "SendBigDataTaskExecutor" )
public void doRunTask ( String tempFilesPath ) {
public void doRunTask ( UploadFileDto uploadFileDto ) {
Long satrtMilliSecond = LocalDateTime . now ( ) . toInstant ( ZoneOffset . of ( "+8" ) ) . toEpochMilli ( ) ;
log . info ( "====== [ task start running, task name is {} ] ======" , "SendBigDataTask" ) ;
runTask ( tempFilesPath ) ;
runTask ( uploadFileDto ) ;
Long endMilliSecond = LocalDateTime . now ( ) . toInstant ( ZoneOffset . of ( "+8" ) ) . toEpochMilli ( ) ;
log . info ( "====== [ task start end, task name is {},cost milliSecond is {} ] ======" , "SendBigDataTask" , ConvertUtil . secondToTime ( endMilliSecond - satrtMilliSecond ) ) ;
}
@ -27,22 +98,223 @@ public class SaveToFileTask {
/ * *
* 执 行 异 步 任 务
*
* @param tempFilesPath 上 传 的 多 个 临 时 文 件 路 径 ,
* @param uploadFileDto 需 要 传 输 用 到 的 Bean
* /
private void runTask ( String tempFilesPath ) {
// 4. 调用加密接口进行加密
private void runTask ( UploadFileDto uploadFileDto ) {
boolean finalTag = false ;
// 获取需求的地址
String tempFilesPath = uploadFileDto . getLocalSavePath ( ) ;
if ( StringUtils . contains ( tempFilesPath , FILE_PATH_SPLIT ) ) {
// 多文件分文件分别进行处理
String [ ] split = StringUtils . split ( tempFilesPath , FILE_PATH_SPLIT ) ;
if ( split . length > 0 ) {
for ( String eachPath : split ) {
finalTag = handleEachFileContent ( eachPath , uploadFileDto ) ;
}
}
} else {
// 单文件处理
finalTag = handleEachFileContent ( tempFilesPath , uploadFileDto ) ;
}
// 更新状态为成功,更新解析成功的条数
UploadFile uploadFile = new UploadFile ( ) ;
if ( finalTag ) {
BeanUtils . copyProperties ( uploadFileDto , uploadFile ) ;
uploadFile . setUploadTag ( SUCCESS_TAG ) ;
uploadFileService . update ( uploadFile ) ;
} else {
// 失败进行容错
BeanUtils . copyProperties ( uploadFileDto , uploadFile ) ;
uploadFile . setUploadTag ( FAIL_TAG ) ;
uploadFileService . update ( uploadFile ) ;
}
}
/ * *
* 对 每 一 个 文 件 的 处 理 操 作
*
* @param filePath 进 行 操 作 的 每 一 个 文 件 的 路 径
* /
private boolean handleEachFileContent ( String filePath , UploadFileDto uploadFileDto ) {
List < String > fileAllLinesList = FileUtil . readLines ( filePath , "utf-8" ) ;
if ( CollectionUtil . isNotEmpty ( fileAllLinesList ) ) {
// 分批调用接口进行加密
batchSendToEncrypt ( fileAllLinesList ) ;
}
// 把临时存储的文件进行删除
boolean delFileTag = delTempSaveFile ( filePath ) ;
// 文件传输给2号库服务器 - 走sftp协议 - 支持断点续传
transFileToOtherServer ( filePath ) ;
// 调用远程接口完成一条记录更新
boolean sendUpdatePostReqTag = sendUpdatePostReq ( filePath , uploadFileDto ) ;
// fixme 这里要修改之前的平台给一个更新接口,然后这边可以用rpc调用,也可以用http,也可以考虑直接消息中间件进行解耦
if ( delFileTag & & sendUpdatePostReqTag ) {
return Boolean . TRUE ;
}
return Boolean . FALSE ;
}
private void transFileToOtherServer ( String filePath ) {
Session session = JschUtil . getSession ( REMOTE_TRANS_HOST , REMOTE_TRANS_PORT , REMOTE_TRANS_SSH_USER , REMOTE_TRANS_SSH_PW ) ;
Sftp sftp = JschUtil . createSftp ( session ) ;
//fixme sftp.put("C:\\Users\\Administrator\\Desktop\\233.txt", "/home", Sftp.Mode.RESUME);
sftp . put ( filePath , REMOTE_TRANS_DIR_PATH , Sftp . Mode . RESUME ) ;
sftp . close ( ) ;
}
private boolean delTempSaveFile ( String tempFilesPath ) {
boolean del = FileUtil . del ( tempFilesPath ) ;
if ( del ) {
log . info ( "======== [success del file, file path is {} ] ========" , tempFilesPath ) ;
return Boolean . TRUE ;
} else {
log . error ( "======== [fail del file, file path is {} ] ========" , tempFilesPath ) ;
return Boolean . FALSE ;
}
}
private void batchSendToEncrypt ( List < String > fileAllLinesList ) {
List < List < String > > partition = Lists . partition ( fileAllLinesList , SEND_ENCRYPT_LIMIT ) ;
partition . forEach (
list - > {
// 装配需要的Json参数
SendEncryptJsonContent sendEncryptJsonContent = new SendEncryptJsonContent ( ) ;
//fixme 还有一个过期时间参数为选填参数,暂时不做设置
String tels = StringUtils . listPrintWithSpecialSplit ( list , null ) ;
sendEncryptJsonContent . setTels ( Base64 . encode ( tels ) ) ;
sendEncryptJsonContent . setReqId ( RandomUtil . randomString ( 10 ) ) ;
sendEncryptJsonContent . setAppId ( encryptAppId ) ;
// 配置 sign
String signStr = makeSign ( ) ;
if ( StringUtils . isBlank ( signStr ) ) {
log . error ( "SaveToFileTask|makeSign fail!" ) ;
}
sendEncryptJsonContent . setSig ( signStr ) ;
// 组装成JSON
String readSendJsonStr = JSON . toJSONString ( sendEncryptJsonContent ) ;
log . info ( "SaveToFileTask|batchSendToEncrypt ready send json is : {}" , readSendJsonStr ) ;
int count = 1 ;
while ( count < = 3 ) {
// 调用HTTP请求发送数据
HttpResponse httpResponse = sendPostReq ( readSendJsonStr ) ;
if ( httpResponse . isOk ( ) & & httpResponse . body ( ) . contains ( "success" ) ) {
log . info ( "========== [SaveToFileTask|batchSendToEncrypt request success, response is {} ] ==========" , httpResponse . body ( ) ) ;
String responseStr = httpResponse . body ( ) ;
// 解析返回的结果,并写回本地
parseResponseStr ( responseStr ) ;
break ;
} else {
count + + ;
try {
Thread . sleep ( 3_0000 ) ;
} catch ( InterruptedException e ) {
log . error ( "SaveToFileTask|batchSendToEncrypt sleep ERROR. message is" , e . getMessage ( ) , e ) ;
}
log . error ( "========== [SaveToFileTask|batchSendToEncrypt request fail, response is {} ] ==========" , httpResponse . body ( ) ) ;
}
}
if ( count > 3 ) {
log . error ( "========== [SaveToFileTask|batchSendToEncrypt update send status fail, url is {} ] ==========" , encryptAddress ) ;
}
}
) ;
}
private void parseResponseStr ( String responseStr ) {
ResponseEncryptJsonContent responseContent = JSONArray . parseObject ( responseStr , ResponseEncryptJsonContent . class ) ;
// 处理需要的加密号串
String tels = responseContent . getTels ( ) ;
// 4.1 失败进行容错
String [ ] splitResTels = StringUtils . split ( tels , SPLIT_TAG ) ;
// 4.2 把临时存储的文件进行删除
if ( splitResTels ! = null & & splitResTels . length > 0 ) {
List < String > stringList = Arrays . asList ( splitResTels ) ;
// TODO: 2021/1/5 0005 这里保存文件的路径进行定义下
String path = "" ;
// 写入指定路径
writeToFile ( stringList , path ) ;
}
// 4.3 更新状态为成功,更新解析成功的条数
}
// 5. 文件传输给2号库服务器
private void writeToFile ( List < String > collect , String fullPath ) {
// 构建存储文件
try {
if ( ! FileUtil . exist ( fullPath ) ) {
Files . write ( Paths . get ( fullPath ) , collect , StandardOpenOption . CREATE_NEW ) ;
} else {
Files . write ( Paths . get ( fullPath ) , collect , StandardOpenOption . APPEND ) ;
}
} catch ( IOException e ) {
log . error ( "write prepare send file fail, please check param, fullPath is {}, {}" , fullPath , e ) ;
}
}
private String makeSign ( ) {
String signBuilder = encryptAppId +
encryptTK +
DateUtil . localDateTimeFormat ( LocalDateTime . now ( ) , FORMATE_TIMESTAMP ) ;
return SecureUtil . md5 ( signBuilder ) . toUpperCase ( ) ;
}
/ * *
* 调 用 HTTP 请 求 发 送 Post 请 求
*
* @param json 请 求 的 body 内 容
* @return 返 回 请 求 结 果
* /
private HttpResponse sendPostReq ( String json ) {
HttpResponse httpResponse = HttpRequest
. post ( encryptAddress )
. header ( "Content-Type" , "application/json;charset=utf-8" )
. body ( json )
. execute ( ) ;
return httpResponse ;
}
/ * *
* 调 用 HTTP 请 求 发 送 更 新 记 录 的 Post 请 求
*
* @param path
* @return 返 回 请 求 结 果
* /
private boolean sendUpdatePostReq ( String path , UploadFileDto uploadFileDto ) {
// 构建发送参数
SendRemoteUpdateJsonContent sendRemoteUpdateJsonContent = new SendRemoteUpdateJsonContent ( ) ;
BeanUtil . copyProperties ( uploadFileDto , sendRemoteUpdateJsonContent ) ;
// 转成Json字符串
String readySendJson = JSON . toJSONString ( sendRemoteUpdateJsonContent ) ;
// 5.1 文件传输和记录更新()
// 发送请求
HttpResponse httpResponse = HttpRequest
. post ( REMOTE_UPDATE_ADDR )
. header ( "Content-Type" , "application/json;charset=utf-8" )
. body ( readySendJson )
. execute ( ) ;
// 解析回送请求
if ( httpResponse . isOk ( ) & & httpResponse . body ( ) . contains ( "SUCCESS" ) ) {
log . info ( "====== [success send upload record request! ]======" ) ;
return Boolean . TRUE ;
}
return Boolean . FALSE ;
}
}