@ -1,13 +1,14 @@
package com.baiye.module.service.impl ;
import cn.hutool.core.codec.Base64 ;
import cn.hutool.core.collection.CollUtil ;
import cn.hutool.core.io.FileUtil ;
import cn.hutool.core.text.StrPool ;
import cn.hutool.core.text.csv.CsvUtil ;
import cn.hutool.core.text.csv.CsvWriter ;
import cn.hutool.core.util.CharsetUtil ;
import cn.hutool.core.util.IdUtil ;
import cn.hutool.core.util.ObjectUtil ;
import cn.hutool.json.JSONUtil ;
import com.baiye.config.properties.DeliveryProperties ;
import com.baiye.constant.AdPlatFormConstants ;
import com.baiye.constant.DefaultNumberConstants ;
@ -23,16 +24,15 @@ import com.baiye.module.entity.Task;
import com.baiye.module.service.ClueService ;
import com.baiye.module.service.DeliveryRecordService ;
import com.baiye.module.service.DmpCallbackService ;
import com.baiye.module.service.dto.ImeiZDTO ;
import com.baiye.module.service.dto.InterfaceResponseDTO ;
import com.baiye.util.* ;
import com.google.common.base.Joiner ;
import com.google.common.base.Splitter ;
import com.google.common.collect.Lists ;
import com.google.common.collect.Sets ;
import lombok.RequiredArgsConstructor ;
import lombok.SneakyThrows ;
import lombok.extern.slf4j.Slf4j ;
import net.lingala.zip4j.exception.ZipException ;
import org.apache.commons.lang3.StringUtils ;
import org.apache.http.HttpStatus ;
import org.springframework.stereotype.Service ;
@ -43,7 +43,6 @@ import java.util.List;
import java.util.Objects ;
import java.util.concurrent.TimeUnit ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
/ * *
* @author Enzo
@ -71,48 +70,47 @@ public class DmpCallbackServiceImpl implements DmpCallbackService {
@Override
@SneakyThrows
@Transactional ( rollbackFor = Exception . class )
public CommonResponse < Object > resourceCallback ( InterfaceResponseDTO interfaceResponseDTO ) {
String sign = interfaceResponseDTO . getSign ( ) ;
String bathNo = interfaceResponseDTO . getBathNo ( ) ;
public CommonResponse < Object > resourceCallback ( InterfaceResponseDTO responseDTO ) {
String bathNo = responseDTO . getBatchId ( ) ;
Object objCache = redisUtils . get ( bathNo ) ;
if ( ObjectUtil . isNotNull ( objCache ) ) {
return CommonResponse . createByError ( ) ;
}
interfaceResponseDTO . setSign ( null ) ;
if ( ! RsaUtil . verify ( JSONUtil . toJsonStr ( interfaceResponseDTO ) ,
RsaUtil . getPublicKey ( deliveryProperties . getPublicKey ( ) ) , sign ) ) {
return CommonResponse . createByErrorMessage ( "visa verification failure" ) ;
return CommonResponse . createByErrorCodeMessage
( ResponseCode . BATCH_NUMBER_EXIST_OR_SEND . getCode ( ) ,
ResponseCode . BATCH_NUMBER_REPEAT_TRANSMISSION . getDesc ( ) ) ;
}
// 设置 十 分钟缓存
redisUtils . set ( bathNo , bathNo , DefaultNumberConstants . TEN _NUMBER, TimeUnit . MINUTES ) ;
// 设置五分钟缓存
redisUtils . set ( bathNo , bathNo , DefaultNumberConstants . FIVE_NUMBER , TimeUnit . MINUTES ) ;
DeliveryRecord byTaskName = deliveryRecordService . findByTaskName ( bathNo ) ;
if ( ObjectUtil . isNotNull ( byTaskName )
& & byTaskName . getStatus ( ) = = DefaultNumberConstants . ZERO_NUMBER ) {
log . info ( "================ the receive data taskName as {} ==============" , bathNo ) ;
Long taskId = byTaskName . getTaskId ( ) ;
List < String > tagList = Lists . newArrayList ( ) ;
List < ImeiZDTO > list = interfaceResponseDTO . getImeis ( ) ;
List < String > phoneList = Lists . newArrayList ( ) ;
List < List < String > > originalList = Lists . newArrayList ( ) ;
if ( responseDTO . getResult ( ) = = DefaultNumberConstants . ZERO_NUMBER
& & CollUtil . isNotEmpty ( responseDTO . getDatas ( ) ) ) {
aggregateData ( responseDTO , byTaskName , tagList , phoneList , originalList ) ;
}
if ( ObjectUtil . isNotNull ( byTaskName . getType ( ) ) ) {
List < String > phoneList = list . stream ( ) . map ( ImeiZDTO : : getZid ) . collect ( Collectors . toList ( ) ) ;
// 去除重复数据
tagList = Lists . newArrayList ( Sets . newHashSet ( phoneList ) ) ;
phoneList = Lists . newArrayList ( Sets . newHashSet ( phoneList ) ) ;
List < String > responseList = DecryptPnoUtil . decryptPhoneList ( phoneList ) ;
// 剔除空值
responseList = responseList . stream ( ) . filter ( StringUtils : : isNotBlank ) . collect ( Collectors . toList ( ) ) ;
// 请求黑名单
if ( CollUtil . isNotEmpty ( responseList ) ) {
log . info ( "=================== the responseList size as {} =====================" , responseList . size ( ) ) ;
if ( byTaskName . getType ( ) = = DefaultNumberConstants . ONE_NUMBER ) {
saveClue ( bathNo , byTaskName , taskId , tagList , responseList ) ;
}
if ( byTaskName . getType ( ) = = DefaultNumberConstants . THREE_NUMBER ) {
sendRequest ( bathNo , tagList , responseList ) ;
}
if ( byTaskName . getType ( ) = = DefaultNumberConstants . TWO_NUMBER ) {
downResource ( bathNo , byTaskName , list , responseList ) ;
downResource ( bathNo , byTaskName , originalList , responseList ) ;
}
}
if ( byTaskName . getType ( ) = = DefaultNumberConstants . THREE_NUMBER ) {
sendRequest ( bathNo , tagList , responseList ) ;
}
}
return CommonResponse . createBySuccess ( HttpStatus . SC_OK ) ;
@ -123,7 +121,9 @@ public class DmpCallbackServiceImpl implements DmpCallbackService {
}
private void saveClue ( String taskName , DeliveryRecord byTaskName , Long taskId , List < String > tagList , List < String > decryptList ) {
private void saveClue ( String taskName , DeliveryRecord byTaskName , Long
taskId , List < String > tagList , List < String > decryptList ) {
// 请求黑名单
List < String > isNotblackList = IsBlackUtil . getNotblackList ( decryptList ) ;
// 将imei转为线索
Integer saveNumer = clueService . saveClue ( tagList , isNotblackList ,
@ -146,56 +146,78 @@ public class DmpCallbackServiceImpl implements DmpCallbackService {
}
private void downResource ( String taskName , DeliveryRecord byTaskName , List < ImeiZDTO > list , List < String > responseList ) throws ZipException {
List < String > csvList = Lists . newArrayList ( ) ;
@SneakyThrows
private void downResource ( String taskName , DeliveryRecord
byTaskName , List < List < String > > originalList , List < String > responseList ) {
String uuid = IdUtil . fastSimpleUUID ( ) ;
List < String > csvList = Lists . newArrayList ( ) ;
originalList = Lists . newArrayList ( Sets . newHashSet ( originalList ) ) ;
String file = deliveryProperties . getFileUrl ( ) . concat ( StrPool . SLASH ) . concat ( uuid ) ;
String path = String . valueOf ( redisUtils . get ( CacheKey . DMP_DOWN_URL . concat ( String . valueOf ( byTaskName . getDownId ( ) ) ) ) ) ;
// 查询缓存
if ( StringUtils . isBlank ( path ) ) {
path = deliveryRecordClient . queryPath ( byTaskName . getDownId ( ) , SecurityConstants . FROM_IN ) . getBody ( ) ;
}
Object objPath = redisUtils . get ( CacheKey . DMP_DOWN_URL . concat ( String . valueOf ( byTaskName . getDownId ( ) ) ) ) ;
String path = ObjectUtil . isNotNull ( objPath ) ?
String . valueOf ( objPath ) : deliveryRecordClient . queryPath
( byTaskName . getDownId ( ) , SecurityConstants . FROM_IN ) . getBody ( ) ;
if ( ObjectUtil . isNotNull ( path ) ) {
// 解压文件
String unzipPath =
CompressUtil . unzipFiles ( deliveryProperties . getFileUrl ( ) ,
path , deliveryProperties . getZipPassword ( ) ) ;
String unzipPath = CompressUtil . unzipFiles
( deliveryProperties . getFileUrl ( ) , path , deliveryProperties . getZipPassword ( ) ) ;
File unzipFile = new File ( unzipPath ) ;
File parseFile = Objects . requireNonNull
( unzipFile . listFiles ( ) ) [ DefaultNumberConstants . ZERO_NUMBER ] ;
// 解析文件
for ( int i = 0 ; i < responseList . size ( ) ; i + + ) {
ImeiZDTO dto = list . get ( i ) ;
String encrypt = EncryptUtil . aesEncrypt ( responseList . get ( i ) , AdPlatFormConstants . PLAT_DECRYPTION ) ;
List < String > collect = Stream . of ( dto . getImei ( ) , dto . getTag ( ) , dto . getZid ( ) , encrypt ) . collect ( Collectors . toList ( ) ) ;
String join = Joiner . on ( StrPool . COMMA ) . skipNulls ( ) . join ( collect ) ;
List < String > stringList = Lists . newArrayList ( originalList . get ( i ) ) ;
stringList . add ( EncryptUtil . aesEncrypt ( responseList . get ( i ) , AdPlatFormConstants . PLAT_DECRYPTION ) ) ;
String join = Joiner . on ( StrPool . COMMA ) . skipNulls ( ) . join ( stringList ) ;
csvList . add ( join ) ;
}
FileZipUtil . writeToCsv ( csvList , parseFile . getPath ( ) , Boolean . TRUE ) ;
// 设置压缩文件
CompressUtil . decryptionCompression ( path , parseFile . getPath ( ) , null ) ;
deliveryRecordService . updateStatusByTaskName ( DefaultNumberConstants . ONE_NUMBER , responseList . size ( ) , taskName ) ;
FileUtil . del ( unzipPath ) ;
return ;
}
// 首次加载
String csvPath = file . concat ( StrPool . DOT ) . concat ( FileConstant . CSV_FILE_SUB_NAME ) ;
String zipPath = file . concat ( FileConstant . ZIP_FILE_SUB_NAME ) ;
CsvWriter writer = CsvUtil . getWriter ( csvPath , CharsetUtil . CHARSET_UTF_8 ) ;
String zipPath = file . concat ( FileConstant . ZIP_FILE_SUB_NAME ) ;
for ( int i = 0 ; i < responseList . size ( ) ; i + + ) {
ImeiZDTO dto = list . get ( i ) ;
String encrypt = EncryptUtil . aesEncrypt ( responseList . get ( i ) , AdPlatFormConstants . PLAT_DECRYPTION ) ;
String [ ] objects = Stream . of ( dto . getImei ( ) , dto . getTag ( ) , dto . getZid ( ) , encrypt ) . toArray ( String [ ] : : new ) ;
writer . writeLine ( objects ) ;
List < String > stringList = Lists . newArrayList ( originalList . get ( i ) ) ;
stringList . add ( EncryptUtil . aesEncrypt ( responseList . get ( i ) , AdPlatFormConstants . PLAT_DECRYPTION ) ) ;
writer . writeLine ( stringList . toArray ( new String [ DefaultNumberConstants . ZERO_NUMBER ] ) ) ;
}
writer . close ( ) ;
// 设置压缩文件
CompressUtil . decryptionCompression ( zipPath , csvPath , null ) ;
String filePath = zipPath . substring
( zipPath . lastIndexOf ( StrPool . SLASH ) + DefaultNumberConstants . ONE_NUMBER ) ;
FileUtil . del ( csvPath ) ;
String downUrl = deliveryProperties . getDmpDownPath ( ) . concat ( filePath ) ;
deliveryRecordService . updateStatusByTaskName ( DefaultNumberConstants . ONE_NUMBER , responseList . size ( ) , taskName ) ;
deliveryRecordClient . updatePath ( zipPath , downUrl , byTaskName . getDownId ( ) , SecurityConstants . FROM_IN ) ;
redisUtils . set ( CacheKey . DMP_DOWN_URL . concat ( String . valueOf ( byTaskName . getDownId ( ) ) ) , downUrl , DefaultNumberConstants . ONE_NUMBER , TimeUnit . DAYS ) ;
deliveryRecordService . updateStatusByTaskName ( DefaultNumberConstants . ONE_NUMBER , responseList . size ( ) , taskName ) ;
redisUtils . set ( CacheKey . DMP_DOWN_URL . concat ( String . valueOf ( byTaskName . getDownId ( ) ) ) , zipPath , DefaultNumberConstants . ONE_NUMBER , TimeUnit . DAYS ) ;
}
private static void aggregateData ( InterfaceResponseDTO responseDTO , DeliveryRecord byTaskName , List < String > tagList , List < String > phoneList , List < List < String > > originalList ) {
for ( InterfaceResponseDTO . Data data : responseDTO . getDatas ( ) ) {
List < String > stringList = Splitter . on ( StrPool . COMMA ) . trimResults ( )
. omitEmptyStrings ( ) . splitToList ( Base64 . decodeStr ( data . getData ( ) ) ) ;
if ( byTaskName . getType ( ) = = DefaultNumberConstants . ONE_NUMBER ) {
String type = stringList . get ( DefaultNumberConstants . THREE_NUMBER ) ;
if ( ValidationUtil . isInteger ( type ) ) {
Integer number = Integer . valueOf ( type ) ;
if ( DisplayNumberUtil . getDefaultSkipNumber ( ) . contains ( number ) ) {
continue ;
}
}
}
if ( stringList . size ( ) > DefaultNumberConstants . THREE_NUMBER ) {
originalList . add ( stringList ) ;
tagList . add ( stringList . get ( DefaultNumberConstants . ONE_NUMBER ) ) ;
phoneList . add ( stringList . get ( DefaultNumberConstants . TWO_NUMBER ) ) ;
}
}
}
}