代码暂存

master
土豆兄弟 4 years ago
parent a641a2ab0c
commit 6ca22893c2

@ -22,7 +22,7 @@ public class HttpUtil extends cn.hutool.http.HttpUtil {
* @param json body
* @return
*/
private HttpResponse sendPostReq(String url, String json){
public static HttpResponse sendPostReq(String url, String json){
HttpResponse httpResponse = HttpRequest
.post(url)
.body(json)

@ -18,7 +18,10 @@ package me.zhengjie.modules.abmessage.service;
import me.zhengjie.modules.abmessage.domain.AbMessage;
import me.zhengjie.modules.abmessage.service.dto.AbMessageDto;
import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import java.util.Map;
import java.util.List;
import java.io.IOException;
@ -47,6 +50,14 @@ public interface AbMessageService {
*/
List<AbMessageDto> queryAll(AbMessageQueryCriteria criteria);
/**
*
*
* @param example
* @param pageable
* @return
*/
Slice<AbMessage> queryAll(Example example, Pageable pageable);
/**
* ID
* @param id ID

@ -15,6 +15,7 @@
*/
package me.zhengjie.modules.abmessage.service.impl;
import com.google.common.collect.Lists;
import me.zhengjie.modules.abmessage.domain.AbMessage;
import me.zhengjie.utils.ValidationUtil;
import me.zhengjie.utils.FileUtil;
@ -24,12 +25,16 @@ import me.zhengjie.modules.abmessage.service.AbMessageService;
import me.zhengjie.modules.abmessage.service.dto.AbMessageDto;
import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria;
import me.zhengjie.modules.abmessage.service.mapstruct.AbMessageMapper;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import me.zhengjie.utils.PageUtil;
import me.zhengjie.utils.QueryHelp;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.io.IOException;
@ -110,11 +115,22 @@ public class AbMessageServiceImpl implements AbMessageService {
FileUtil.downloadExcel(list, response);
}
/**
*
* @param example
* @param pageable
* @return
*/
@Override
public Slice<AbMessage> queryAll(Example example, Pageable pageable) {
return abMessageRepository.findAll(example, pageable);
}
@Override
@Deprecated
public boolean buildSendTask(List<AbMessageDto> abMessageDtos) {
// 记录待发送的相关信息
return false;
}
}

@ -15,10 +15,14 @@
*/
package me.zhengjie.modules.form.service;
import me.zhengjie.modules.abmessage.domain.AbMessage;
import me.zhengjie.modules.form.domain.FormMessage;
import me.zhengjie.modules.form.service.dto.FormMessageDto;
import me.zhengjie.modules.form.service.dto.FormMessageQueryCriteria;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import java.util.Map;
import java.util.List;
import java.io.IOException;
@ -47,6 +51,15 @@ public interface FormMessageService {
*/
List<FormMessageDto> queryAll(FormMessageQueryCriteria criteria);
/**
*
*
* @param example
* @param pageable
* @return
*/
Slice<FormMessage> queryAll(Example example, Pageable pageable);
/**
* ID
* @param id ID

@ -24,6 +24,8 @@ import me.zhengjie.modules.form.service.FormMessageService;
import me.zhengjie.modules.form.service.dto.FormMessageDto;
import me.zhengjie.modules.form.service.dto.FormMessageQueryCriteria;
import me.zhengjie.modules.form.service.mapstruct.FormMessageMapper;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.data.domain.Page;
@ -61,6 +63,11 @@ public class FormMessageServiceImpl implements FormMessageService {
return formMessageMapper.toDto(formMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder)));
}
@Override
public Slice<FormMessage> queryAll(Example example, Pageable pageable) {
return formMessageRepository.findAll(example, pageable);
}
@Override
@Transactional
public FormMessageDto findById(Long id) {

@ -15,15 +15,21 @@
*/
package me.zhengjie.modules.taskrecord.rest;
import cn.hutool.core.util.NumberUtil;
import me.zhengjie.annotation.Log;
import me.zhengjie.common.CommonResponse;
import me.zhengjie.common.ResponseCode;
import me.zhengjie.modules.taskrecord.domain.TaskRecord;
import me.zhengjie.modules.taskrecord.rest.vo.TaskRecordBuildVO;
import me.zhengjie.modules.taskrecord.rest.vo.TaskRecordMergeVO;
import me.zhengjie.modules.taskrecord.rest.vo.TaskRecordSendVO;
import me.zhengjie.modules.taskrecord.service.TaskRecordService;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria;
import me.zhengjie.task.MergeRecordFilesTask;
import me.zhengjie.task.ProduceLocalFileTask;
import me.zhengjie.task.SendRecordTask;
import me.zhengjie.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Pageable;
import lombok.RequiredArgsConstructor;
@ -55,6 +61,8 @@ public class TaskRecordController {
private SendRecordTask sendRecordTask;
@Autowired
private ProduceLocalFileTask produceLocalFileTask;
@Autowired
private MergeRecordFilesTask mergeRecordFilesTask;
@Log("导出数据")
@ApiOperation("导出数据")
@ -104,36 +112,44 @@ public class TaskRecordController {
/**
*
*
* @param resources
* @param taskRecordBuildVO VO
* @return
*/
@PostMapping("/buildTask")
@Log("新建课包任务")
@ApiOperation("新建课包任务")
@PreAuthorize("@el.check('taskRecord:add')")
public ResponseEntity<Object> buildTask(@Validated @RequestBody TaskRecord resources){
public ResponseEntity<Object> buildTask(@RequestBody TaskRecordBuildVO taskRecordBuildVO){
TaskRecord resources = taskRecordBuildVO.getResources();
String tag = taskRecordBuildVO.getTag();
if (StringUtils.isBlank(tag)){
return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK);
}
// 启动建立课包任务
produceLocalFileTask.doRunTask(resources);
produceLocalFileTask.doRunTask(resources, tag);
// 返回结果
return new ResponseEntity<>(CommonResponse.createBySuccess(), HttpStatus.OK);
}
/**
* ,,
* @param idList
* @return
*
* @param taskRecordSendVO VO
* @return
*/
@Log("发送课包任务")
@ApiOperation("发送课包任务")
@PreAuthorize("@el.check('taskRecord:list')")
@PostMapping(value = "/sendTask")
public ResponseEntity<Object> sendTask(@RequestParam(value = "ids") List<Integer> idList){
if (CollectionUtils.isEmpty(idList)){
public ResponseEntity<Object> sendTask(@RequestBody TaskRecordSendVO taskRecordSendVO){
// 参数校验
Integer id = taskRecordSendVO.getId();
Long limit = taskRecordSendVO.getLimit();
if (id == null || limit == null){
return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK);
}
// 调用发送课包任务,开始发送
sendRecordTask.doRunTask(idList);
sendRecordTask.doRunTask(id, limit);
return new ResponseEntity<>(CommonResponse.createBySuccess(), HttpStatus.OK);
}
@ -142,7 +158,7 @@ public class TaskRecordController {
*
* @param sendTotal
* @param id Id
* @return
* @return
*/
@Log("发送课包数目校验")
@ApiOperation("发送课包数目校验")
@ -174,12 +190,23 @@ public class TaskRecordController {
return Boolean.FALSE;
}
/**
*
*
* @param taskRecordMergeVO VO
* @return
*/
@Log("任务课包任务合并")
@ApiOperation("任务课包任务合并")
@PreAuthorize("@el.check('taskRecord:list')")
@PostMapping(value = "/mergeRecord")
public ResponseEntity<Object> mergeRecord(@RequestBody Integer[] ids, @RequestBody TaskRecord taskRecord){
// TODO: 2020/9/8 0008 发送任务在相关逻辑类中实现
public ResponseEntity<Object> mergeRecord(@RequestBody TaskRecordMergeVO taskRecordMergeVO){
List<Integer> idList = taskRecordMergeVO.getIdList();
String type = taskRecordMergeVO.getType();
if (CollectionUtils.isEmpty(idList) || StringUtils.isBlank(type)){
return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK);
}
mergeRecordFilesTask.doRunTask(idList, type);
return new ResponseEntity<>(HttpStatus.OK);
}
// ================ 自定义功能实现 end ================

@ -0,0 +1,22 @@
package me.zhengjie.modules.taskrecord.rest.vo;
import lombok.Data;
import me.zhengjie.modules.taskrecord.domain.TaskRecord;
import java.io.Serializable;
@Data
public class TaskRecordBuildVO implements Serializable {
/**
*
*/
private TaskRecord resources;
/**
* AB
*/
private String tag;
}

@ -0,0 +1,23 @@
package me.zhengjie.modules.taskrecord.rest.vo;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* VO
*/
@Data
public class TaskRecordMergeVO implements Serializable {
/**
* Id
*/
private List<Integer> idList;
/**
*
*/
private String type;
}

@ -0,0 +1,22 @@
package me.zhengjie.modules.taskrecord.rest.vo;
import lombok.Data;
import java.io.Serializable;
/**
* VO
*/
@Data
public class TaskRecordSendVO implements Serializable {
/**
* Id
*/
private Integer id;
/**
*
*/
private Long limit;
}

@ -65,7 +65,7 @@ public interface TaskRecordService {
*
* @param resources /
*/
void update(TaskRecord resources);
TaskRecord update(TaskRecord resources);
/**
*

@ -81,11 +81,11 @@ public class TaskRecordServiceImpl implements TaskRecordService {
@Override
@Transactional(rollbackFor = Exception.class)
public void update(TaskRecord resources) {
public TaskRecord update(TaskRecord resources) {
TaskRecord taskRecord = taskRecordRepository.findById(resources.getId()).orElseGet(TaskRecord::new);
ValidationUtil.isNull( taskRecord.getId(),"TaskRecord","id",resources.getId());
taskRecord.copy(resources);
taskRecordRepository.save(taskRecord);
return taskRecordRepository.save(taskRecord);
}
@Override

@ -0,0 +1,103 @@
package me.zhengjie.task;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.taskrecord.service.TaskRecordService;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto;
import me.zhengjie.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
@Component
@Slf4j
public class MergeRecordFilesTask {
/**
* - - listA ListB
*/
private static final String INTERSECTION_TAG = "intersection";
/**
* - -listA listB
*/
private static final String UNION_TAG = "union";
/**
* - -listA - listB
*/
private static final String SUBTRACTION_TAG = "subtraction";
@Autowired
private TaskRecordService taskRecordService;
/**
*
*/
private List<String> tempList;
/**
* ,
*
*/
@Async(value = "MergeRecordFilesTask")
public void doRunTask(List<Integer> ids, String tag){
Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("====== [ task start running, task name is {} ] ======", "MergeRecordFilesTask");
runTask(ids, tag);
Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "MergeRecordFilesTask", (endMilliSecond - satrtMilliSecond));
}
private void runTask(List<Integer> ids, String tag) {
// 准备需要进行操作的集合内容
ids.forEach(
id->{
TaskRecordDto dto = taskRecordService.findById(id);
if (dto != null){
String localFilePath = dto.getLocalFilePath();
if (StringUtils.isNotBlank(localFilePath)){
try {
List<String> list = Files.readAllLines(Paths.get(localFilePath));
if (!CollectionUtils.isEmpty(list)){
// 执行相关的集合间的操作
mergeEachCollect(list, tag);
}
} catch (IOException e) {
log.error("========= [Read file Error , filePath is : {} ] =========", localFilePath, e);
}
}
}
}
);
}
private void mergeEachCollect(List<String> list, String tag) {
if (CollectionUtils.isEmpty(tempList)){
tempList.addAll(list);
}
// 交
if (StringUtils.equalsIgnoreCase(INTERSECTION_TAG, tag)){
}
// 并
if (StringUtils.equalsIgnoreCase(UNION_TAG, tag)){
}
// 差
if (StringUtils.equalsIgnoreCase(SUBTRACTION_TAG, tag)){
}
}
}

@ -1,14 +1,37 @@
package me.zhengjie.task;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.system.OsInfo;
import cn.hutool.system.SystemUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.abmessage.domain.AbMessage;
import me.zhengjie.modules.abmessage.service.AbMessageService;
import me.zhengjie.modules.form.domain.FormMessage;
import me.zhengjie.modules.form.service.FormMessageService;
import me.zhengjie.modules.taskrecord.domain.TaskRecord;
import org.apache.poi.ss.formula.functions.T;
import me.zhengjie.modules.taskrecord.service.TaskRecordService;
import me.zhengjie.utils.FileUtil;
import me.zhengjie.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.stream.Collectors;
/**
*
@ -17,6 +40,65 @@ import java.util.List;
@Slf4j
public class ProduceLocalFileTask {
/**
*
*/
private static final String PRE_FILE = "out_task_";
/**
* (,...) [使]
*/
private static final String SEPARATOR = "_";
/**
* AB
*/
private static final String TAG_AB = "AB";
/**
* FORM
*/
private static final String TAG_FORM = "FORM";
/**
* (,...) [使]
*/
private static final int RANDOM_STRING_LENGTH = 6;
/**
* - 0
*/
private static final int START_PAGE = 0;
/**
* - 2000
*/
private static final int SIZE_PAGE = 2000;
/**
* Windows
*/
@Value(value = "file.windows.path")
private String windowsFilePath;
/**
* Windows
*/
@Value(value = "file.linux.path")
private String linuxFilePath;
/**
* Windows
*/
@Value(value = "file.mac.path")
private String macFilePath;
@Autowired
private TaskRecordService taskRecordService;
@Autowired
private AbMessageService abMessageService;
@Autowired
private FormMessageService formMessageService;
/**
* ,
*
@ -24,15 +106,174 @@ public class ProduceLocalFileTask {
* @param task
*/
@Async(value = "ProduceLocalFileTaskExecutor")
public void doRunTask(TaskRecord task){
public void doRunTask(TaskRecord task, String tag){
Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("====== [ task start running, task name is {} ] ======", "ProduceLocalFileTask");
runTask(task);
runTask(task, tag);
Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "ProduceLocalFileTask", (endMilliSecond - satrtMilliSecond));
}
private void runTask(TaskRecord task) {
private void runTask(TaskRecord task, String tag) {
// 转换需要的查询参数
String params = task.getParams();
TaskRecord taskRecord = new TaskRecord();
// 组装分页查询参数
Pageable pageable = PageRequest.of(START_PAGE, SIZE_PAGE);
// 设置不同的查询条件,去查AB表和Form表的数据
// 构建写入文件的路径
buildFileWritePath(taskRecord);
if (StringUtils.equalsIgnoreCase(TAG_AB, tag)){
// Json参数解析
AbMessage abMessage = new AbMessage();
if (StringUtils.isNotBlank(params)){
abMessage = JSON.parseObject(params, AbMessage.class);
}
// 构建分页查询,并写入到文件中
buildABQueryToFile(taskRecord, pageable, abMessage);
}
if (StringUtils.equalsIgnoreCase(TAG_FORM, tag)){
// Json参数解析
FormMessage formMessage = new FormMessage();
if (StringUtils.isNotBlank(params)){
formMessage = JSON.parseObject(params, FormMessage.class);
}
// 构建分页查询,并写入到文件中
buildFromQueryToFile(taskRecord, pageable, formMessage);
}
}
private void buildFromQueryToFile(TaskRecord taskRecord, Pageable pageable, FormMessage formMessage) {
if (formMessage == null){
log.error("============[Parse formMessage fail, please check formMessage info ]============");
return;
}
Example<FormMessage> example = Example.of(formMessage);
while(true){
// AB表查询
Slice<FormMessage> slice = formMessageService.queryAll(example, pageable);
if (slice == null){
log.error("============[Query FormMessage fail, please check example info : {} ]============", example.getProbe().toString());
return;
}
List<FormMessage> abMessageList = slice.getContent();
if (!CollectionUtils.isEmpty(abMessageList)){
// 只获取需要的字段
List<String> collect = abMessageList.stream()
.map(FormMessage::getUid)
.collect(Collectors.toList());
// 设置总数
taskRecord.setTotal((long) slice.getSize());
// 解析并写入文件
writeToFile(taskRecord, collect);
}
if (!slice.hasNext()){
break;
}
pageable = slice.nextPageable();
}
}
private void buildABQueryToFile(TaskRecord taskRecord, Pageable pageable, AbMessage abMessage) {
if (abMessage == null){
log.error("============[Parse abMessage fail, please check abMessage info ]============");
return;
}
Example<AbMessage> example = Example.of(abMessage);
while(true){
// AB表查询
Slice<AbMessage> slice = abMessageService.queryAll(example, pageable);
if (slice == null){
log.error("============[Query abMessage fail, please check example info : {} ]============", example.getProbe().toString());
return;
}
List<AbMessage> abMessageList = slice.getContent();
if (!CollectionUtils.isEmpty(abMessageList)){
// 只获取需要的字段
List<String> collect = abMessageList.stream()
.map(AbMessage::getUid)
.collect(Collectors.toList());
// 设置总数
taskRecord.setTotal((long) slice.getSize());
// 解析并写入文件
writeToFile(taskRecord, collect);
}
if (!slice.hasNext()){
break;
}
pageable = slice.nextPageable();
}
}
/**
*
*
* @param taskRecord
*/
private void buildFileWritePath(TaskRecord taskRecord){
// 获取环境配置信息
OsInfo osInfo = SystemUtil.getOsInfo();
String fullPath;
if (osInfo.isWindows()){
// 构建存储文件
fullPath = windowsFilePath + buildOnlyFileNameRule();
}else if (osInfo.isLinux()){
// 构建存储文件
fullPath = linuxFilePath + buildOnlyFileNameRule();
}else if (osInfo.isMac()){
// 构建存储文件
// 构建存储文件
fullPath = macFilePath + buildOnlyFileNameRule();
}else {
return;
}
taskRecord.setLocalFilePath(fullPath);
}
/**
* ,
*
* @param taskRecord
* @param collect
*/
private void writeToFile(TaskRecord taskRecord, List<String> collect){
String fullPath = taskRecord.getLocalFilePath();
if (StringUtils.isBlank(fullPath)){
return;
}
// 构建存储文件
try {
if (!FileUtil.exist(fullPath)){
Files.write(Paths.get(fullPath), collect, StandardOpenOption.CREATE_NEW);
TaskRecord update = taskRecordService.update(taskRecord);
if (update == null){
log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString());
}
}else{
Files.write(Paths.get(fullPath), collect, StandardOpenOption.APPEND);
TaskRecord update = taskRecordService.update(taskRecord);
if (update == null){
log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString());
}
}
} catch (IOException e) {
log.error("write prepare send file fail, please check param, fullPath is {}, {}", fullPath, e );
}
}
/**
*
*
* @return
*/
private String buildOnlyFileNameRule(){
String fileName = PRE_FILE +
RandomUtil.randomString(RANDOM_STRING_LENGTH) +
SEPARATOR +
Instant.now().getEpochSecond();
return fileName;
}
}

@ -1,11 +1,18 @@
package me.zhengjie.task;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.taskrecord.domain.TaskRecord;
import me.zhengjie.modules.taskrecord.service.TaskRecordService;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto;
import me.zhengjie.utils.HttpUtil;
import me.zhengjie.utils.StringUtils;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@ -16,19 +23,31 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
*
*/
@Component
@Slf4j
public class SendRecordTask <T> {
public class SendRecordTask {
/**
*
*/
private static final int BUILD_STATUS = 1;
/**
*
*/
private static final int SEND_LIMIT = 500;
/**
*
*/
@Value(value = "req.db.url")
private String url;
@Autowired
private TaskRecordService taskRecordService;
@ -39,35 +58,74 @@ public class SendRecordTask <T> {
* @return
*/
@Async(value = "SendRecordTaskExecutor")
public void doRunTask(List<Integer> list){
public void doRunTask(Integer id, Long limit){
Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("====== [ task start running, task name is {} ] ======", "SendRecordTask");
runTask(list);
runTask(id, limit);
Long endMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "SendRecordTask", (endMilliSecond - satrtMilliSecond));
}
private void runTask(List<Integer> list) {
private void runTask(Integer id, Long limit) {
// 准备所有需要发送任务
List<TaskRecordDto> readySendTasks = new ArrayList<>();
list.forEach(
id->{
TaskRecordDto taskRecordDto = taskRecordService.findById(id);
if (BUILD_STATUS == taskRecordDto.getIsBuild() && StringUtils.isNotBlank(taskRecordDto.getLocalFilePath())){
readySendTasks.add(taskRecordDto);
}
TaskRecordDto taskRecordDto = taskRecordService.findById(id);
if (BUILD_STATUS == taskRecordDto.getIsBuild() && StringUtils.isNotBlank(taskRecordDto.getLocalFilePath())){
String localFilePath = taskRecordDto.getLocalFilePath();
if (StringUtils.isBlank(localFilePath)){
log.error("============== [ localFilePath is Empty, please check taskRecordDto : {} ]==============", taskRecordDto.toString());
return;
}
try {
// tips : 发送前需要调用前端接口进行发送数目合法性校验
List<String> lines = Files.readAllLines(Paths.get(localFilePath));
// 限制发送次数 为limit以下
Long sendTotal = taskRecordDto.getSendTotal();
if (sendTotal <= 0 ){
return;
}
);
// 遍历所有的待发送路径进行发送
readySendTasks.forEach(
task->{
String localFilePath = task.getLocalFilePath();
try {
// 发送前已经通过前端接口进行发送数目合法性校验
List<String> lines = Files.readAllLines(Paths.get(localFilePath));
List<String> collect = lines.stream()
.skip(sendTotal)
.limit(limit - sendTotal)
.collect(Collectors.toList());
// 分批进行发送
batchSend(collect);
// 对发送后的记录进行更新
TaskRecord taskRecord = new TaskRecord();
BeanUtils.copyProperties(taskRecordDto, taskRecord);
taskRecord.setSendTotal(taskRecordDto.getSendTotal() + limit);
taskRecordService.update(taskRecord);
} catch (IOException e) {
log.error("Read ready send file fail, send class is {}, ready send file path is {}", this.getClass().getName(), localFilePath, e);
}
}
}
} catch (IOException e) {
log.error("read ready send file fail, send class is {}, ready send file path is {}", this.getClass().getName(), localFilePath, e);
private void batchSend(List<String> collect) {
List<List<String>> partition = Lists.partition(collect, SEND_LIMIT);
partition.forEach(
list->{
// TODO: 2020/9/10 0010 调用推送地址进行推送
String jsonStr = JSON.toJSONString(list);
if (StringUtils.isNotBlank(jsonStr)){
int count = 1;
// 失败重发请求3次
while (count <= 3){
// 调用HTTP请求发送数据
HttpResponse httpResponse = HttpUtil.sendPostReq(url, jsonStr);
// todo 这个判别发送正常的码要进行判别 ok
if (httpResponse.isOk() && httpResponse.body().contains("ok")){
log.info("========== [DB request success, response is {} ] ==========", httpResponse.body());
break;
}else{
count ++;
log.error("========== [DB request fail, response is {} ] ==========", httpResponse.body());
}
}
if (count > 3) {
log.error("========== [DB update send status fail, url is {} ] ==========", url);
return;
}
}
}
);

@ -0,0 +1,69 @@
package me.zhengjie;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.student.domain.Student;
import me.zhengjie.modules.student.repository.StudentRepository;
import me.zhengjie.modules.student.service.StudentService;
import me.zhengjie.utils.FileUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.*;
import org.springframework.test.context.junit4.SpringRunner;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* JPA使
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class SpringJPATest {
@Autowired
private StudentService studentService;
@Autowired
private StudentRepository studentRepository;
@Test
public void testQueryBySlice() throws Exception{
Pageable pageable = PageRequest.of(0,2);
Student student = new Student();
student.setAge(11);
Example<Student> example = Example.of(student);
while(true){
Slice<Student> slice = studentRepository.findAll(example, pageable);
List<Student> studentList = slice.getContent();
List<String> strList = new ArrayList<>();
AtomicInteger count = new AtomicInteger(1);
int i = 1;
studentList.forEach(
each->{
strList.add(each.toString() + (count.getAndIncrement()) +"_"+ Thread.currentThread().getName());
// strList.add(each.toString() + (count.getAndIncrement()) +"_"+ Thread.currentThread().getName());
}
);
System.out.println("1111");
log.info("111111");
String path = "C:\\Users\\Administrator\\Desktop\\2_1";
if (!FileUtil.exist(path)){
Files.write(Paths.get(path), strList, StandardOpenOption.CREATE_NEW);
}else{
Files.write(Paths.get(path), strList, StandardOpenOption.APPEND);
}
if (!slice.hasNext()){
break;
}
pageable = slice.nextPageable();
}
}
}

@ -0,0 +1,52 @@
package me.zhengjie;
import com.google.common.collect.Lists;
import me.zhengjie.utils.FileUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
/**
*
*/
public class TempTest {
@Test
public void testListWrite() throws IOException {
List<String> list = Arrays.asList("111","222");
Files.write(Paths.get("C:\\Users\\Administrator\\Desktop\\2"), list);
}
@Test
public void testWriteOne() {
List<String> list = Arrays.asList("111","222");
list.forEach(
one->{
try {
Files.write(Paths.get("C:\\Users\\Administrator\\Desktop\\2"), one.getBytes(), StandardOpenOption.APPEND);
} catch (IOException e) {
e.printStackTrace();
}
}
);
}
@Test
public void testListWrite2() throws IOException {
List<String> list = Arrays.asList("111","222");
String path = "C:\\Users\\Administrator\\Desktop\\2";
if (!FileUtil.exist(path)){
Files.write(Paths.get(path), list, StandardOpenOption.CREATE_NEW);
}else{
Files.write(Paths.get(path), list, StandardOpenOption.APPEND);
}
}
}
Loading…
Cancel
Save