|
|
@ -16,7 +16,6 @@ import me.zhengjie.modules.tag.domain.Tag;
|
|
|
|
import me.zhengjie.modules.tag.service.TagService;
|
|
|
|
import me.zhengjie.modules.tag.service.TagService;
|
|
|
|
import me.zhengjie.modules.tag.service.dto.TagDto;
|
|
|
|
import me.zhengjie.modules.tag.service.dto.TagDto;
|
|
|
|
import me.zhengjie.modules.tag.service.dto.TagQueryCriteria;
|
|
|
|
import me.zhengjie.modules.tag.service.dto.TagQueryCriteria;
|
|
|
|
import me.zhengjie.modules.taskrecord.domain.TaskRecord;
|
|
|
|
|
|
|
|
import me.zhengjie.modules.taskrecord.service.TaskRecordService;
|
|
|
|
import me.zhengjie.modules.taskrecord.service.TaskRecordService;
|
|
|
|
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto;
|
|
|
|
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto;
|
|
|
|
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria;
|
|
|
|
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria;
|
|
|
@ -26,6 +25,8 @@ import me.zhengjie.utils.StringUtils;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
import org.springframework.context.annotation.Scope;
|
|
|
|
import org.springframework.context.annotation.Scope;
|
|
|
|
|
|
|
|
import org.springframework.data.domain.PageRequest;
|
|
|
|
|
|
|
|
import org.springframework.data.domain.Pageable;
|
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
|
@ -34,10 +35,8 @@ import java.nio.file.Files;
|
|
|
|
import java.nio.file.Paths;
|
|
|
|
import java.nio.file.Paths;
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
import java.time.ZoneOffset;
|
|
|
|
import java.time.ZoneOffset;
|
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.Collections;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
@Component
|
|
|
|
@Component
|
|
|
@ -53,7 +52,7 @@ public class SendBigDataTask {
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 每次发送条数限制
|
|
|
|
* 每次发送条数限制
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
private static final int SEND_LIMIT = 500;
|
|
|
|
private static final int SEND_LIMIT = 5000;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 完成发送任务标识
|
|
|
|
* 完成发送任务标识
|
|
|
@ -63,7 +62,7 @@ public class SendBigDataTask {
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 用于随机选的环境代替字符, 如果添加环境就进行自动添加
|
|
|
|
* 用于随机选的环境代替字符, 如果添加环境就进行自动添加
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
public static final String BASE_URL_CHAR_NUMBER = "12345";
|
|
|
|
public static final String BASE_URL_CHAR_NUMBER = "123456";
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 下游发送url
|
|
|
|
* 下游发送url
|
|
|
@ -97,20 +96,23 @@ public class SendBigDataTask {
|
|
|
|
private void runTask(BuildRecord resource, SendBigDataDTO sendBigDataDTO) {
|
|
|
|
private void runTask(BuildRecord resource, SendBigDataDTO sendBigDataDTO) {
|
|
|
|
// 根据发送任务的Id来读取发送号码表
|
|
|
|
// 根据发送任务的Id来读取发送号码表
|
|
|
|
Integer id = resource.getId();
|
|
|
|
Integer id = resource.getId();
|
|
|
|
if (id <= 0 ){
|
|
|
|
log.info("id: {} ", id);
|
|
|
|
|
|
|
|
if (id == null || id <= 0 ){
|
|
|
|
|
|
|
|
log.error("====== { ERROR , resource body is {} } =======" , resource.toString());
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
BuildRecordDto buildRecordDto = buildRecordService.findById(id);
|
|
|
|
BuildRecordDto buildRecordDto = buildRecordService.findById(id);
|
|
|
|
|
|
|
|
buildRecordDto.setTaskBuildId(resource.getTaskBuildId());
|
|
|
|
// 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体
|
|
|
|
// 查询记录表,返回这个id去发送记录表中进行查找 - 构建发送体
|
|
|
|
TagQueryCriteria tagQueryCriteria = new TagQueryCriteria();
|
|
|
|
TagQueryCriteria tagQueryCriteria = new TagQueryCriteria();
|
|
|
|
tagQueryCriteria.setTaskId(Long.valueOf(id));
|
|
|
|
tagQueryCriteria.setTaskId(Long.valueOf(id));
|
|
|
|
// fixme 这里可以分页进行查询 - 目前不进行优化 - 为全查
|
|
|
|
tagQueryCriteria.setPushStatus(NON_FINISH_SEND_STATATUS);
|
|
|
|
List<TagDto> tagDtos = tagService.queryAll(tagQueryCriteria);
|
|
|
|
// 加入分页防止查不动的情况 这里可以分页进行查询 - 目前不进行优化 - 为全查
|
|
|
|
|
|
|
|
Pageable pageable = PageRequest.of(0, sendBigDataDTO.getLimit().intValue());
|
|
|
|
|
|
|
|
List<Tag> backContent = tagService.queryAllBySlice(tagQueryCriteria, pageable);
|
|
|
|
// 遍历查询等待发送的列表
|
|
|
|
// 遍历查询等待发送的列表
|
|
|
|
List<TagDto> collect = tagDtos.stream()
|
|
|
|
List<Tag> collect = backContent.stream()
|
|
|
|
.filter(one -> one.getPushStatus() == NON_FINISH_SEND_STATATUS)
|
|
|
|
|
|
|
|
.distinct()
|
|
|
|
.distinct()
|
|
|
|
.limit(sendBigDataDTO.getLimit())
|
|
|
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
// 进行去重
|
|
|
|
// 进行去重
|
|
|
|
String resultFilePath = null;
|
|
|
|
String resultFilePath = null;
|
|
|
@ -132,30 +134,34 @@ public class SendBigDataTask {
|
|
|
|
log.error("================== {read file error , please check is , file path is : {} } ================================", resultFilePath, e);
|
|
|
|
log.error("================== {read file error , please check is , file path is : {} } ================================", resultFilePath, e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// 过滤的集合
|
|
|
|
// 过滤的集合
|
|
|
|
Set<String> filterCollect = collect.stream().map(TagDto::getUid).filter(fileLines::contains).collect(Collectors.toSet());
|
|
|
|
Set<String> filterCollect = collect.stream().map(Tag::getUid).filter(fileLines::contains).collect(Collectors.toSet());
|
|
|
|
collect = collect.stream().filter(one -> !filterCollect.contains(one.getUid())).collect(Collectors.toList());
|
|
|
|
collect = collect.stream().filter(one -> !filterCollect.contains(one.getUid())).collect(Collectors.toList());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// 乱序
|
|
|
|
// 乱序
|
|
|
|
// Collections.shuffle(collect);
|
|
|
|
// Collections.shuffle(collect);
|
|
|
|
// 对需要发送的字段进行发送
|
|
|
|
// 对需要发送的字段进行发送
|
|
|
|
batchSend(collect, sendBigDataDTO, buildRecordDto);
|
|
|
|
AtomicLong atomicLong = batchSend(collect, sendBigDataDTO, buildRecordDto);
|
|
|
|
// 对发送后的状态进行更新
|
|
|
|
// 对发送后的状态进行更新
|
|
|
|
resource.setIsSend(FINISH_SEND_TAG);
|
|
|
|
buildRecordDto.setIsSend(FINISH_SEND_TAG);
|
|
|
|
|
|
|
|
BuildRecord buildRecord = new BuildRecord();
|
|
|
|
try{
|
|
|
|
try{
|
|
|
|
Long sendTotal = buildRecordDto.getSendTotal();
|
|
|
|
Long sendTotal = atomicLong.get();
|
|
|
|
if (sendTotal == null){
|
|
|
|
|
|
|
|
sendTotal = 0L;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// 之前要进行校验
|
|
|
|
// 之前要进行校验
|
|
|
|
resource.setSendTotal( sendTotal + sendBigDataDTO.getLimit());
|
|
|
|
Long dbCount = buildRecordDto.getSendTotal();
|
|
|
|
buildRecordService.update(resource);
|
|
|
|
if (dbCount == null){
|
|
|
|
|
|
|
|
dbCount = 0L;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
buildRecordDto.setSendTotal( sendTotal + dbCount);
|
|
|
|
|
|
|
|
BeanUtil.copyProperties(buildRecordDto, buildRecord);
|
|
|
|
|
|
|
|
buildRecordService.update(buildRecord);
|
|
|
|
}catch (Exception e){
|
|
|
|
}catch (Exception e){
|
|
|
|
log.error("==== [ update buildRecord fail, please check ] ====",e);
|
|
|
|
log.error("==== [ update buildRecord fail, buildRecord is {}, please check ] ====", buildRecord.toString() ,e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void batchSend(List<TagDto> collect, SendBigDataDTO sendRecordDTO, BuildRecordDto buildRecordDto) {
|
|
|
|
private AtomicLong batchSend(List<Tag> collect, SendBigDataDTO sendRecordDTO, BuildRecordDto buildRecordDto) {
|
|
|
|
List<List<TagDto>> partition = Lists.partition(collect, SEND_LIMIT);
|
|
|
|
AtomicLong successCount = new AtomicLong(0L);
|
|
|
|
|
|
|
|
List<List<Tag>> partition = Lists.partition(collect, SEND_LIMIT);
|
|
|
|
partition.forEach(
|
|
|
|
partition.forEach(
|
|
|
|
list->{
|
|
|
|
list->{
|
|
|
|
// 调用推送地址进行推送
|
|
|
|
// 调用推送地址进行推送
|
|
|
@ -169,8 +175,6 @@ public class SendBigDataTask {
|
|
|
|
PushDBJsonContent.Client client = new PushDBJsonContent.Client();
|
|
|
|
PushDBJsonContent.Client client = new PushDBJsonContent.Client();
|
|
|
|
client.setCellphone(each.getUid());
|
|
|
|
client.setCellphone(each.getUid());
|
|
|
|
clientList.add(client);
|
|
|
|
clientList.add(client);
|
|
|
|
// 推送完成后修改状态为 已经推送
|
|
|
|
|
|
|
|
each.setPushStatus(FINISH_SEND_TAG);
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
);
|
|
|
|
);
|
|
|
|
pushDBJsonContent.setClientList(clientList);
|
|
|
|
pushDBJsonContent.setClientList(clientList);
|
|
|
@ -180,10 +184,10 @@ public class SendBigDataTask {
|
|
|
|
log.info("============ [ Pre send Json is : {} ] ============", jsonStr);
|
|
|
|
log.info("============ [ Pre send Json is : {} ] ============", jsonStr);
|
|
|
|
int count = 1;
|
|
|
|
int count = 1;
|
|
|
|
// 失败重发请求3次
|
|
|
|
// 失败重发请求3次
|
|
|
|
|
|
|
|
String address ="";
|
|
|
|
while (count <= 3){
|
|
|
|
while (count <= 3){
|
|
|
|
// 对发送请求地址进行准备
|
|
|
|
// 对发送请求地址进行准备
|
|
|
|
String addressTag = sendRecordDTO.getAddressTag();
|
|
|
|
String addressTag = sendRecordDTO.getAddressTag();
|
|
|
|
String address ="";
|
|
|
|
|
|
|
|
if (StringUtils.isNotBlank(addressTag)){
|
|
|
|
if (StringUtils.isNotBlank(addressTag)){
|
|
|
|
address = preSendReqAddress(addressTag);
|
|
|
|
address = preSendReqAddress(addressTag);
|
|
|
|
log.info("========== [DB request address is {} ] =========", address);
|
|
|
|
log.info("========== [DB request address is {} ] =========", address);
|
|
|
@ -192,6 +196,14 @@ public class SendBigDataTask {
|
|
|
|
HttpResponse httpResponse = HttpUtil.sendPostReq(address, jsonStr);
|
|
|
|
HttpResponse httpResponse = HttpUtil.sendPostReq(address, jsonStr);
|
|
|
|
if (httpResponse.isOk() && httpResponse.body().contains("true")){
|
|
|
|
if (httpResponse.isOk() && httpResponse.body().contains("true")){
|
|
|
|
log.info("========== [DB request success, response is {} ] ==========", httpResponse.body());
|
|
|
|
log.info("========== [DB request success, response is {} ] ==========", httpResponse.body());
|
|
|
|
|
|
|
|
successCount.addAndGet(list.size());
|
|
|
|
|
|
|
|
// 把修改完状态的进行更新
|
|
|
|
|
|
|
|
List<Long> pushIds = list.stream().map(Tag::getId).collect(Collectors.toList());
|
|
|
|
|
|
|
|
Long taskId = list.get(0).getTaskId();
|
|
|
|
|
|
|
|
Integer aLong = tagService.updateAllPushStatus(pushIds, taskId);
|
|
|
|
|
|
|
|
if (aLong <= 0){
|
|
|
|
|
|
|
|
log.error("================= [update data fail , please check it .] =================");
|
|
|
|
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
}else{
|
|
|
|
}else{
|
|
|
|
count ++;
|
|
|
|
count ++;
|
|
|
@ -199,25 +211,12 @@ public class SendBigDataTask {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (count > 3) {
|
|
|
|
if (count > 3) {
|
|
|
|
log.error("========== [DB update send status fail, url is {} ] ==========", url);
|
|
|
|
log.error("========== [DB update send status fail, url is {} ] ==========", address);
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
List<Tag> tags = new ArrayList<>();
|
|
|
|
|
|
|
|
collect.forEach(
|
|
|
|
|
|
|
|
one ->{
|
|
|
|
|
|
|
|
Tag tag = new Tag();
|
|
|
|
|
|
|
|
BeanUtil.copyProperties(one, tag);
|
|
|
|
|
|
|
|
tags.add(tag);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
// 把修改完状态的进行更新
|
|
|
|
|
|
|
|
Integer aLong = tagService.saveAll(tags);
|
|
|
|
|
|
|
|
if (aLong <= 0){
|
|
|
|
|
|
|
|
log.error("================= [update data fail , please check it .] =================");
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
);
|
|
|
|
);
|
|
|
|
|
|
|
|
return successCount;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private String preSendReqAddress(String tag) {
|
|
|
|
private String preSendReqAddress(String tag) {
|
|
|
|