@ -32,6 +32,8 @@ import java.util.List;
import java.util.Map ;
import java.util.Map ;
import java.util.Optional ;
import java.util.Optional ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.stream.Collectors ;
import java.util.stream.Collectors ;
/ * *
/ * *
@ -43,6 +45,8 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
@RequiredArgsConstructor
public class DeliveryBalanceTask {
public class DeliveryBalanceTask {
private final Lock lock = new ReentrantLock ( ) ;
private final RedisUtils redisUtils ;
private final RedisUtils redisUtils ;
private final TaskService taskService ;
private final TaskService taskService ;
@ -81,7 +85,7 @@ public class DeliveryBalanceTask {
UserDto userInfo = userService . findUserInfo ( userId ) ;
UserDto userInfo = userService . findUserInfo ( userId ) ;
if ( ObjectUtil . isNotNull ( userInfo )
if ( ObjectUtil . isNotNull ( userInfo )
& & Boolean . TRUE . equals ( userInfo . getEnabled ( ) )
& & Boolean . TRUE . equals ( userInfo . getEnabled ( ) )
& & ObjectUtil . isNotNull ( companyByUserId )
& & ObjectUtil . isNotNull ( companyByUserId )
// 判断价格
// 判断价格
& & ObjectUtil . isNotNull ( companyByUserId . getDmpDeliveryFee ( ) ) ) {
& & ObjectUtil . isNotNull ( companyByUserId . getDmpDeliveryFee ( ) ) ) {
Integer dmpLimitNum = companyByUserId . getDmpLimitNum ( ) ;
Integer dmpLimitNum = companyByUserId . getDmpLimitNum ( ) ;
@ -90,85 +94,94 @@ public class DeliveryBalanceTask {
| | companyByUserId . getUserBalance ( ) < DefaultNumberConstants . TWO_HUNDRED ) {
| | companyByUserId . getUserBalance ( ) < DefaultNumberConstants . TWO_HUNDRED ) {
continue ;
continue ;
}
}
// 截取数据
lock . lock ( ) ;
if ( ObjectUtil . isNotNull ( dmpLimitNum )
try {
& & dmpLimitNum > DefaultNumberConstants . ZERO_NUMBER ) {
// 截取数据
// 统计今天数量
if ( ObjectUtil . isNotNull ( dmpLimitNum )
Integer count = taskImeiService . countByUserIdAndDate ( taskId , DefaultNumberConstants . ONE_NUMBER ) ;
& & dmpLimitNum > DefaultNumberConstants . ZERO_NUMBER ) {
downList = CollUtil . sub ( list , count > =
// 统计今天数量
dmpLimitNum ? DefaultNumberConstants . ZERO_NUMBER : dmpLimitNum , list . size ( ) ) ;
Integer count = taskImeiService . countByUserIdAndDate ( taskId , DefaultNumberConstants . ONE_NUMBER ) ;
list = CollUtil . sub ( list , DefaultNumberConstants . ZERO_NUMBER ,
downList = CollUtil . sub ( list , count > =
count > = dmpLimitNum ? DefaultNumberConstants . ZERO_NUMBER : dmpLimitNum ) ;
dmpLimitNum ? DefaultNumberConstants . ZERO_NUMBER : dmpLimitNum , list . size ( ) ) ;
}
list = CollUtil . sub ( list , DefaultNumberConstants . ZERO_NUMBER ,
count > = dmpLimitNum ? DefaultNumberConstants . ZERO_NUMBER : dmpLimitNum ) ;
if ( CollUtil . isNotEmpty ( list ) ) {
}
// 每次100000 数据
if ( CollUtil . isNotEmpty ( list ) ) {
List < List < TaskImei > > lists = Lists . partition
// 每次100000 数据
( list , DefaultNumberConstants . ONE_HUNDRED_THOUSAND ) ;
List < List < TaskImei > > lists = Lists . partition
// 批量发送
( list , DefaultNumberConstants . ONE_HUNDRED_THOUSAND ) ;
for ( List < TaskImei > taskImeiList : lists ) {
// 批量发送
Long minId = ( long ) DefaultNumberConstants . ZERO_NUMBER ;
for ( List < TaskImei > taskImeiList : lists ) {
Long maxId = ( long ) DefaultNumberConstants . ZERO_NUMBER ;
// 发送邮件并修改状态
// 大于100 进行兑换
sendMailAndUpdateImeiStatus ( taskId , userId , taskImeiList ) ;
if ( taskImeiList . size ( ) > = DefaultNumberConstants . ONE_HUNDRED ) {
String fileUrl = deliveryProperties . getFileUrl ( ) ;
Comparator < TaskImei > comparing = Comparator . comparing ( TaskImei : : getId ) ;
String format = DateUtil . format ( DateUtil . date ( ) , DatePattern . PURE_DATE_PATTERN ) ;
Integer num = ( Integer ) redisUtils . get ( CacheKey . DMP_REQUEST_ID . concat ( format ) ) ;
// 最小id
Optional < TaskImei > min = taskImeiList . stream ( ) . min ( comparing ) ;
if ( min . isPresent ( ) ) {
minId = min . get ( ) . getId ( ) ;
}
// 最大id
Optional < TaskImei > max = taskImeiList . stream ( ) . max ( comparing ) ;
if ( max . isPresent ( ) ) {
maxId = max . get ( ) . getId ( ) ;
}
if ( minId > DefaultNumberConstants . ZERO_NUMBER & &
maxId > DefaultNumberConstants . ZERO_NUMBER ) {
// 转换DTO
List < ImeiDTO > toList = Convert . toList ( ImeiDTO . class , taskImeiList ) ;
Integer batch = ObjectUtil . isNull ( num ) ?
DefaultNumberConstants . ONE_NUMBER : num + DefaultNumberConstants . ONE_NUMBER ;
String taskImeiName = FileConstant . BY . concat ( StrPool . UNDERLINE ) .
concat ( format ) . concat ( StrPool . UNDERLINE ) . concat ( String . valueOf ( batch ) ) ;
String file = fileUrl . concat ( StrPool . SLASH ) . concat ( taskImeiName ) ;
// 保存文件
String csvPath = file . concat ( StrPool . DOT ) . concat ( FileConstant . CSV_FILE_SUB_NAME ) ;
CSVFileUtil . createCsvFile ( toList , csvPath ) ;
String zipPath = file . concat ( FileConstant . ZIP_FILE_SUB_NAME ) ;
// 设置压缩文件
CompressUtil . decryptionCompression ( zipPath , csvPath , deliveryProperties . getZipPassword ( ) ) ;
// 发送邮件
MailUtil . sendMail ( deliveryProperties . getEmailAddress ( ) ,
deliveryProperties . getEmailPassword ( ) , deliveryProperties . getToEmailAddress ( ) , zipPath , taskImeiName ) ;
// 保存发送记录
deliveryRecordService . saveDeliveryRecord ( zipPath , taskImeiName ,
taskImeiList . size ( ) , taskId , userId , DefaultNumberConstants . ONE_NUMBER ) ;
// 保存批次号
redisUtils . set ( CacheKey . DMP_REQUEST_ID . concat ( format ) , batch , DefaultNumberConstants . ONE_NUMBER , TimeUnit . DAYS ) ;
// 批次修改状态
taskImeiService . updateBath ( taskId , CharSequenceUtil . EMPTY , DefaultNumberConstants . ONE_NUMBER , minId , maxId ) ;
// 删除文件
boolean csvResult = FileUtil . del ( csvPath ) ;
boolean zipResult = FileUtil . del ( zipPath ) ;
log . info ( "=========================== the csv path as {}, zip path as {} " + " csvResult as {} zipResult as {} ========================" , csvPath , zipPath , csvResult , zipResult ) ;
}
}
}
}
}
}
if ( CollUtil . isNotEmpty ( downList ) & & downList . size ( ) > DefaultNumberConstants . ONE_HUNDRED ) {
if ( CollUtil . isNotEmpty ( downList ) ) {
// 处理下载包
// 处理下载包
zipDownList ( taskId , downList , companyByUserId ) ;
zipDownList ( taskId , downList , companyByUserId ) ;
}
} finally {
lock . unlock ( ) ;
}
}
}
}
}
}
}
}
}
}
private void sendMailAndUpdateImeiStatus ( Long taskId , Long userId , List < TaskImei > taskImeiList ) {
Long minId = ( long ) DefaultNumberConstants . ZERO_NUMBER ;
Long maxId = ( long ) DefaultNumberConstants . ZERO_NUMBER ;
// 大于100 进行兑换
if ( taskImeiList . size ( ) > = DefaultNumberConstants . ONE_HUNDRED ) {
String fileUrl = deliveryProperties . getFileUrl ( ) ;
Comparator < TaskImei > comparing = Comparator . comparing ( TaskImei : : getId ) ;
String format = DateUtil . format ( DateUtil . date ( ) , DatePattern . PURE_DATE_PATTERN ) ;
Integer num = ( Integer ) redisUtils . get ( CacheKey . DMP_REQUEST_ID . concat ( format ) ) ;
// 最小id
Optional < TaskImei > min = taskImeiList . stream ( ) . min ( comparing ) ;
if ( min . isPresent ( ) ) {
minId = min . get ( ) . getId ( ) ;
}
// 最大id
Optional < TaskImei > max = taskImeiList . stream ( ) . max ( comparing ) ;
if ( max . isPresent ( ) ) {
maxId = max . get ( ) . getId ( ) ;
}
if ( minId > DefaultNumberConstants . ZERO_NUMBER & &
maxId > DefaultNumberConstants . ZERO_NUMBER ) {
// 转换DTO
List < ImeiDTO > toList = Convert . toList ( ImeiDTO . class , taskImeiList ) ;
Integer batch = ObjectUtil . isNull ( num ) ?
DefaultNumberConstants . ONE_NUMBER : num + DefaultNumberConstants . ONE_NUMBER ;
String taskImeiName = FileConstant . BY . concat ( StrPool . UNDERLINE ) .
concat ( format ) . concat ( StrPool . UNDERLINE ) . concat ( String . valueOf ( batch ) ) ;
String file = fileUrl . concat ( StrPool . SLASH ) . concat ( taskImeiName ) ;
// 保存文件
String csvPath = file . concat ( StrPool . DOT ) . concat ( FileConstant . CSV_FILE_SUB_NAME ) ;
CSVFileUtil . createCsvFile ( toList , csvPath ) ;
String zipPath = file . concat ( FileConstant . ZIP_FILE_SUB_NAME ) ;
// 设置压缩文件
CompressUtil . decryptionCompression ( zipPath , csvPath , deliveryProperties . getZipPassword ( ) ) ;
// 发送邮件
MailUtil . sendMail ( deliveryProperties . getEmailAddress ( ) ,
deliveryProperties . getEmailPassword ( ) , deliveryProperties . getToEmailAddress ( ) , zipPath , taskImeiName ) ;
// 保存发送记录
deliveryRecordService . saveDeliveryRecord ( zipPath , taskImeiName ,
taskImeiList . size ( ) , taskId , userId , DefaultNumberConstants . ONE_NUMBER ) ;
// 保存批次号
redisUtils . set ( CacheKey . DMP_REQUEST_ID . concat ( format ) , batch , DefaultNumberConstants . ONE_NUMBER , TimeUnit . DAYS ) ;
// 批次修改状态
taskImeiService . updateBath ( taskId , CharSequenceUtil . EMPTY , DefaultNumberConstants . ONE_NUMBER , minId , maxId ) ;
// 删除文件
boolean csvResult = FileUtil . del ( csvPath ) ;
boolean zipResult = FileUtil . del ( zipPath ) ;
log . info ( "=========================== the csv path as {}, zip path as {} " + " csvResult as {} zipResult as {} ========================" , csvPath , zipPath , csvResult , zipResult ) ;
}
}
}
private void zipDownList ( Long taskId , List < TaskImei > downList , Company companyByUserId ) {
private void zipDownList ( Long taskId , List < TaskImei > downList , Company companyByUserId ) {
log . info ( "============================= down list size as {} =============================" , downList . size ( ) ) ;
log . info ( "============================= down list size as {} =============================" , downList . size ( ) ) ;
Long minId = ( long ) DefaultNumberConstants . ZERO_NUMBER ;
Long minId = ( long ) DefaultNumberConstants . ZERO_NUMBER ;