AB单创建逻辑测试完成

master
土豆兄弟 4 years ago
parent 83b512789b
commit d4f012bbcc

@ -14,10 +14,12 @@ public enum ResponseCode {
SUCCESS(0,"SUCCESS"), SUCCESS(0,"SUCCESS"),
ERROR(1,"ERROR"), ERROR(1,"ERROR"),
// 请求参数校验 // 通用请求参数校验
ILLEGAL_ARGUMENT(1,"请求参数格式错误"), ILLEGAL_ARGUMENT(1,"请求参数格式错误"),
EMPTY_ARGUMENT(1,"请求参数为空"), EMPTY_ARGUMENT(1,"请求参数为空"),
NO_MATCH_ARGUMENT_SET(1,"不能满足要求的参数设置"), NO_MATCH_ARGUMENT_SET(1,"不能满足要求的参数设置"),
// 特殊需要进行前端返回说明的参数定义
TASK_NAME_IS_EXIST(1,"任务名称已经存在"),
// 请求结果性的错误 // 请求结果性的错误
NODATA_ERROR(1,"查询结果为空"), NODATA_ERROR(1,"查询结果为空"),
TASK_BUILD_ERROR(1,"任务建立失败"), TASK_BUILD_ERROR(1,"任务建立失败"),

@ -0,0 +1,117 @@
package me.zhengjie.config.thread;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* 线,线
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig {
/**
* 线
*/
/**
* produce
*/
@Value(value = "${produce.task.thread_pool.corePoolSize}")
private int produceCorePoolSize = 2;
@Value(value = "${produce.task.thread_pool.maxPoolSize}")
private int produceMaxPoolSize = 16;
@Value(value = "${produce.task.thread_pool.queueCapacity}")
private int produceQueueCapacity = 3;
@Value(value = "${produce.task.thread_pool.ThreadNamePrefix}")
private String produceThreadNamePrefix = "ProduceLocalFileTaskExecutor-"; // fixme 这个属性暂时没用起来
/**
* produce
*/
@Value(value = "${send.task.thread_pool.corePoolSize}")
private int sendCorePoolSize = 2;
@Value(value = "${send.task.thread_pool.maxPoolSize}")
private int sendMaxPoolSize = 16;
@Value(value = "${send.task.thread_pool.queueCapacity}")
private int sendQueueCapacity = 3;
@Value(value = "${send.task.thread_pool.ThreadNamePrefix}")
private String sendThreadNamePrefix = "ProduceLocalFileTaskExecutor-"; // fixme 这个属性暂时没用起来
/**
* produce
*/
@Value(value = "${merge.task.thread_pool.corePoolSize}")
private int mergeCorePoolSize = 2;
@Value(value = "${merge.task.thread_pool.maxPoolSize}")
private int mergeMaxPoolSize = 16;
@Value(value = "${merge.task.thread_pool.queueCapacity}")
private int mergeQueueCapacity = 3;
@Value(value = "${merge.task.thread_pool.ThreadNamePrefix}")
private String mergeThreadNamePrefix = "ProduceLocalFileTaskExecutor-"; // fixme 这个属性暂时没用起来
/**
* - 线
*
* @return
*/
@Bean(value = "ProduceLocalFileTaskExecutor")
public Executor produceLocalFileTaskExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
produceCorePoolSize,
produceMaxPoolSize,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(produceQueueCapacity),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
return threadPoolExecutor;
}
/**
* - 线
*
* @return
*/
@Bean(value = "SendRecordTaskExecutor")
public Executor sendRecordTaskExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
sendCorePoolSize,
sendMaxPoolSize,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(sendQueueCapacity),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
return threadPoolExecutor;
}
/**
* - 线
* @return
*/
@Bean(value = "MergeRecordTaskExecutor")
public Executor mergeRecordTaskExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
mergeCorePoolSize,
mergeMaxPoolSize,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(mergeQueueCapacity),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
return threadPoolExecutor;
}
}

@ -58,6 +58,15 @@ public interface AbMessageService {
* @return * @return
*/ */
Slice<AbMessage> queryAll(Example example, Pageable pageable); Slice<AbMessage> queryAll(Example example, Pageable pageable);
/**
*
*
* @param criteria
* @param pageable
* @return
*/
Slice<AbMessage> queryAllSlice(AbMessageQueryCriteria criteria, Pageable pageable);
/** /**
* ID * ID
* @param id ID * @param id ID

@ -66,6 +66,28 @@ public class AbMessageServiceImpl implements AbMessageService {
return abMessageMapper.toDto(abMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder))); return abMessageMapper.toDto(abMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder)));
} }
/**
*
* @param example
* @param pageable
* @return
*/
@Override
public Slice<AbMessage> queryAll(Example example, Pageable pageable) {
return abMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,example,criteriaBuilder),pageable);
}
/**
*
* @param criteria
* @param pageable
* @return
*/
@Override
public Slice<AbMessage> queryAllSlice(AbMessageQueryCriteria criteria, Pageable pageable) {
return abMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder),pageable);
}
@Override @Override
@Transactional @Transactional
public AbMessageDto findById(Long id) { public AbMessageDto findById(Long id) {
@ -115,16 +137,7 @@ public class AbMessageServiceImpl implements AbMessageService {
FileUtil.downloadExcel(list, response); FileUtil.downloadExcel(list, response);
} }
/**
*
* @param example
* @param pageable
* @return
*/
@Override
public Slice<AbMessage> queryAll(Example example, Pageable pageable) {
return abMessageRepository.findAll(example, pageable);
}
@Override @Override
@Deprecated @Deprecated

@ -54,11 +54,11 @@ public interface FormMessageService {
/** /**
* *
* *
* @param example * @param criteria
* @param pageable * @param pageable
* @return * @return
*/ */
Slice<FormMessage> queryAll(Example example, Pageable pageable); Slice<FormMessage> queryAllSlice(FormMessageQueryCriteria criteria, Pageable pageable);
/** /**
* ID * ID

@ -64,8 +64,8 @@ public class FormMessageServiceImpl implements FormMessageService {
} }
@Override @Override
public Slice<FormMessage> queryAll(Example example, Pageable pageable) { public Slice<FormMessage> queryAllSlice(FormMessageQueryCriteria criteria, Pageable pageable) {
return formMessageRepository.findAll(example, pageable); return formMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> QueryHelp.getPredicate(root,criteria,criteriaBuilder), pageable);
} }
@Override @Override

@ -25,4 +25,6 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
* @date 2020-09-09 * @date 2020-09-09
**/ **/
public interface TaskRecordRepository extends JpaRepository<TaskRecord, Integer>, JpaSpecificationExecutor<TaskRecord> { public interface TaskRecordRepository extends JpaRepository<TaskRecord, Integer>, JpaSpecificationExecutor<TaskRecord> {
} }

@ -15,7 +15,7 @@
*/ */
package me.zhengjie.modules.taskrecord.rest; package me.zhengjie.modules.taskrecord.rest;
import cn.hutool.core.util.NumberUtil; import me.zhengjie.annotation.AnonymousAccess;
import me.zhengjie.annotation.Log; import me.zhengjie.annotation.Log;
import me.zhengjie.common.CommonResponse; import me.zhengjie.common.CommonResponse;
import me.zhengjie.common.ResponseCode; import me.zhengjie.common.ResponseCode;
@ -118,9 +118,15 @@ public class TaskRecordController {
@PostMapping("/buildTask") @PostMapping("/buildTask")
@Log("新建课包任务") @Log("新建课包任务")
@ApiOperation("新建课包任务") @ApiOperation("新建课包任务")
@PreAuthorize("@el.check('taskRecord:add')") // @PreAuthorize("@el.check('taskRecord:add')")
@AnonymousAccess
public ResponseEntity<Object> buildTask(@RequestBody TaskRecordBuildVO taskRecordBuildVO){ public ResponseEntity<Object> buildTask(@RequestBody TaskRecordBuildVO taskRecordBuildVO){
TaskRecord resources = taskRecordBuildVO.getResources(); TaskRecord resources = taskRecordBuildVO.getResources();
// 任务名称不能重复
String taskName = resources.getTaskName();
if (!checkTaskNameForBuild(taskName)){
return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.TASK_NAME_IS_EXIST), HttpStatus.OK);
}
String tag = taskRecordBuildVO.getTag(); String tag = taskRecordBuildVO.getTag();
if (StringUtils.isBlank(tag)){ if (StringUtils.isBlank(tag)){
return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK); return new ResponseEntity<>(CommonResponse.createByError(ResponseCode.EMPTY_ARGUMENT), HttpStatus.OK);
@ -131,6 +137,16 @@ public class TaskRecordController {
return new ResponseEntity<>(CommonResponse.createBySuccess(), HttpStatus.OK); return new ResponseEntity<>(CommonResponse.createBySuccess(), HttpStatus.OK);
} }
private boolean checkTaskNameForBuild(String taskName){
TaskRecordQueryCriteria criteria = new TaskRecordQueryCriteria();
criteria.setTaskName(taskName);
List<TaskRecordDto> taskRecordDtos = taskRecordService.queryAll(criteria);
if (!CollectionUtils.isEmpty(taskRecordDtos)){
return Boolean.FALSE;
}
return Boolean.TRUE;
}
/** /**
* ,, * ,,
* *
@ -139,8 +155,9 @@ public class TaskRecordController {
*/ */
@Log("发送课包任务") @Log("发送课包任务")
@ApiOperation("发送课包任务") @ApiOperation("发送课包任务")
@PreAuthorize("@el.check('taskRecord:list')") // @PreAuthorize("@el.check('taskRecord:list')")
@PostMapping(value = "/sendTask") @PostMapping(value = "/sendTask")
@AnonymousAccess
public ResponseEntity<Object> sendTask(@RequestBody TaskRecordSendVO taskRecordSendVO){ public ResponseEntity<Object> sendTask(@RequestBody TaskRecordSendVO taskRecordSendVO){
// 参数校验 // 参数校验
Integer id = taskRecordSendVO.getId(); Integer id = taskRecordSendVO.getId();
@ -162,8 +179,9 @@ public class TaskRecordController {
*/ */
@Log("发送课包数目校验") @Log("发送课包数目校验")
@ApiOperation("发送课包数目校验") @ApiOperation("发送课包数目校验")
@PreAuthorize("@el.check('taskRecord:list')") // @PreAuthorize("@el.check('taskRecord:list')")
@PostMapping(value = "/checkSendTotal") @PostMapping(value = "/checkSendTotal")
@AnonymousAccess
public ResponseEntity<Object> checkSendTotal(@RequestParam(value = "sendTotal", defaultValue = "0") Long sendTotal, public ResponseEntity<Object> checkSendTotal(@RequestParam(value = "sendTotal", defaultValue = "0") Long sendTotal,
@RequestParam(value = "id")Integer id){ @RequestParam(value = "id")Integer id){
if (sendTotal == null || id == null){ if (sendTotal == null || id == null){
@ -198,8 +216,9 @@ public class TaskRecordController {
*/ */
@Log("任务课包任务合并") @Log("任务课包任务合并")
@ApiOperation("任务课包任务合并") @ApiOperation("任务课包任务合并")
@PreAuthorize("@el.check('taskRecord:list')") // @PreAuthorize("@el.check('taskRecord:list')")
@PostMapping(value = "/mergeRecord") @PostMapping(value = "/mergeRecord")
@AnonymousAccess
public ResponseEntity<Object> mergeRecord(@RequestBody TaskRecordMergeVO taskRecordMergeVO){ public ResponseEntity<Object> mergeRecord(@RequestBody TaskRecordMergeVO taskRecordMergeVO){
List<Integer> idList = taskRecordMergeVO.getIdList(); List<Integer> idList = taskRecordMergeVO.getIdList();
String type = taskRecordMergeVO.getType(); String type = taskRecordMergeVO.getType();

@ -15,7 +15,7 @@ public class TaskRecordBuildVO implements Serializable {
private TaskRecord resources; private TaskRecord resources;
/** /**
* AB * AB
*/ */
private String tag; private String tag;

@ -69,12 +69,12 @@ public class TaskRecordServiceImpl implements TaskRecordService {
return taskRecordMapper.toDto(taskRecord); return taskRecordMapper.toDto(taskRecord);
} }
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public TaskRecordDto create(TaskRecord resources) { public TaskRecordDto create(TaskRecord resources) {
// todo 增加新建课包的逻辑 // todo 增加新建课包的逻辑
TaskRecordDto taskRecordDto = taskRecordMapper.toDto(taskRecordRepository.save(resources)); TaskRecordDto taskRecordDto = taskRecordMapper.toDto(taskRecordRepository.save(resources));
return taskRecordDto; return taskRecordDto;
} }

@ -1,7 +1,5 @@
package me.zhengjie.task; package me.zhengjie.task;
import com.alipay.api.domain.Person;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.taskrecord.domain.TaskRecord; import me.zhengjie.modules.taskrecord.domain.TaskRecord;
import me.zhengjie.modules.taskrecord.service.TaskRecordService; import me.zhengjie.modules.taskrecord.service.TaskRecordService;
@ -60,7 +58,7 @@ public class MergeRecordFilesTask {
* , * ,
* *
*/ */
@Async(value = "MergeRecordFilesTask") @Async(value = "MergeRecordTaskExecutor")
public void doRunTask(List<Integer> ids, String tag, TaskRecord taskRecord){ public void doRunTask(List<Integer> ids, String tag, TaskRecord taskRecord){
Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); Long satrtMilliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("====== [ task start running, task name is {} ] ======", "MergeRecordFilesTask"); log.info("====== [ task start running, task name is {} ] ======", "MergeRecordFilesTask");

@ -1,18 +1,23 @@
package me.zhengjie.task; package me.zhengjie.task;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.system.OsInfo; import cn.hutool.system.OsInfo;
import cn.hutool.system.SystemUtil; import cn.hutool.system.SystemUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.sun.org.apache.xpath.internal.operations.Bool;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.abmessage.domain.AbMessage; import me.zhengjie.modules.abmessage.domain.AbMessage;
import me.zhengjie.modules.abmessage.service.AbMessageService; import me.zhengjie.modules.abmessage.service.AbMessageService;
import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria;
import me.zhengjie.modules.form.domain.FormMessage; import me.zhengjie.modules.form.domain.FormMessage;
import me.zhengjie.modules.form.service.FormMessageService; import me.zhengjie.modules.form.service.FormMessageService;
import me.zhengjie.modules.form.service.dto.FormMessageQueryCriteria;
import me.zhengjie.modules.taskrecord.domain.TaskRecord; import me.zhengjie.modules.taskrecord.domain.TaskRecord;
import me.zhengjie.modules.taskrecord.service.TaskRecordService; import me.zhengjie.modules.taskrecord.service.TaskRecordService;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordDto;
import me.zhengjie.modules.taskrecord.service.dto.TaskRecordQueryCriteria;
import me.zhengjie.utils.FileUtil; import me.zhengjie.utils.FileUtil;
import me.zhengjie.utils.StringUtils; import me.zhengjie.utils.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Example; import org.springframework.data.domain.Example;
@ -27,10 +32,11 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -50,7 +56,6 @@ public class ProduceLocalFileTask {
*/ */
private static final String TAG_FORM = "FORM"; private static final String TAG_FORM = "FORM";
/** /**
* - 0 * - 0
*/ */
@ -59,7 +64,7 @@ public class ProduceLocalFileTask {
/** /**
* - 2000 * - 2000
*/ */
private static final int SIZE_PAGE = 2000; private static final int SIZE_PAGE = 200;
/** /**
* *
@ -69,17 +74,17 @@ public class ProduceLocalFileTask {
/** /**
* Windows * Windows
*/ */
@Value(value = "file.windows.path") @Value(value = "${file.windows.path}")
private String windowsFilePath; private String windowsFilePath;
/** /**
* Windows * Windows
*/ */
@Value(value = "file.linux.path") @Value(value = "${file.linux.path}")
private String linuxFilePath; private String linuxFilePath;
/** /**
* Windows * Windows
*/ */
@Value(value = "file.mac.path") @Value(value = "${file.mac.path}")
private String macFilePath; private String macFilePath;
@Autowired @Autowired
@ -107,12 +112,11 @@ public class ProduceLocalFileTask {
private void runTask(TaskRecord task, String tag) { private void runTask(TaskRecord task, String tag) {
// 转换需要的查询参数 // 转换需要的查询参数
String params = task.getParams(); String params = task.getParams();
TaskRecord taskRecord = new TaskRecord();
// 组装分页查询参数 // 组装分页查询参数
Pageable pageable = PageRequest.of(START_PAGE, SIZE_PAGE); Pageable pageable = PageRequest.of(START_PAGE, SIZE_PAGE);
// 设置不同的查询条件,去查AB表和Form表的数据 // 设置不同的查询条件,去查AB表和Form表的数据
// 构建写入文件的路径 // 构建写入文件的路径
buildFileWritePath(taskRecord); buildFileWritePath(task);
if (StringUtils.equalsIgnoreCase(TAG_AB, tag)){ if (StringUtils.equalsIgnoreCase(TAG_AB, tag)){
// Json参数解析 // Json参数解析
AbMessage abMessage = new AbMessage(); AbMessage abMessage = new AbMessage();
@ -120,7 +124,7 @@ public class ProduceLocalFileTask {
abMessage = JSON.parseObject(params, AbMessage.class); abMessage = JSON.parseObject(params, AbMessage.class);
} }
// 构建分页查询,并写入到文件中 // 构建分页查询,并写入到文件中
buildABQueryToFile(taskRecord, pageable, abMessage); buildABQueryToFile(task, pageable, abMessage);
} }
if (StringUtils.equalsIgnoreCase(TAG_FORM, tag)){ if (StringUtils.equalsIgnoreCase(TAG_FORM, tag)){
@ -130,7 +134,7 @@ public class ProduceLocalFileTask {
formMessage = JSON.parseObject(params, FormMessage.class); formMessage = JSON.parseObject(params, FormMessage.class);
} }
// 构建分页查询,并写入到文件中 // 构建分页查询,并写入到文件中
buildFromQueryToFile(taskRecord, pageable, formMessage); buildFromQueryToFile(task, pageable, formMessage);
} }
} }
@ -139,22 +143,30 @@ public class ProduceLocalFileTask {
log.error("============[Parse formMessage fail, please check formMessage info ]============"); log.error("============[Parse formMessage fail, please check formMessage info ]============");
return; return;
} }
Example<FormMessage> example = Example.of(formMessage); FormMessageQueryCriteria formMessageQueryCriteria = new FormMessageQueryCriteria();
BeanUtils.copyProperties(formMessage, formMessageQueryCriteria);
int count = 0; // 记录总的生成记录数
while(true){ while(true){
// AB表查询 // AB表查询
Slice<FormMessage> slice = formMessageService.queryAll(example, pageable); Slice<FormMessage> slice = formMessageService.queryAllSlice(formMessageQueryCriteria, pageable);
if (slice == null){ if (slice == null){
log.error("============[Query FormMessage fail, please check example info : {} ]============", example.getProbe().toString()); log.error("============[Query FormMessage fail, please check example info : {} ]============", formMessageQueryCriteria.toString());
return; return;
} }
List<FormMessage> abMessageList = slice.getContent(); List<FormMessage> formMessageList = slice.getContent();
if (!CollectionUtils.isEmpty(abMessageList)){ if (!CollectionUtils.isEmpty(formMessageList)){
// 只获取需要的字段 // 只获取需要的字段
List<String> collect = abMessageList.stream() List<String> collect = formMessageList.stream()
.map(FormMessage::getUid) .map(FormMessage::getUid)
.collect(Collectors.toList()); .collect(Collectors.toList());
// 设置总数 // 设置总数
taskRecord.setTotal((long) slice.getSize()); count += formMessageList.size();
taskRecord.setTotal((long) count);
TaskRecordDto taskRecordDto = taskRecordService.create(taskRecord);
if (taskRecordDto == null){
log.error("============[Write taskRecord fail, please check example info : {} ]============");
return;
}
// 解析并写入文件 // 解析并写入文件
writeToFile(taskRecord, collect); writeToFile(taskRecord, collect);
} }
@ -170,12 +182,14 @@ public class ProduceLocalFileTask {
log.error("============[Parse abMessage fail, please check abMessage info ]============"); log.error("============[Parse abMessage fail, please check abMessage info ]============");
return; return;
} }
Example<AbMessage> example = Example.of(abMessage); AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria();
BeanUtils.copyProperties(abMessage, abMessageQueryCriteria);
int count = 0; // 记录总的生成记录数
while(true){ while(true){
// AB表查询 // AB表查询
Slice<AbMessage> slice = abMessageService.queryAll(example, pageable); Slice<AbMessage> slice = abMessageService.queryAllSlice(abMessageQueryCriteria, pageable);
if (slice == null){ if (slice == null){
log.error("============[Query abMessage fail, please check example info : {} ]============", example.getProbe().toString()); log.error("============[Query abMessage fail, please check example info : {} ]============", abMessageQueryCriteria.toString());
return; return;
} }
List<AbMessage> abMessageList = slice.getContent(); List<AbMessage> abMessageList = slice.getContent();
@ -185,11 +199,19 @@ public class ProduceLocalFileTask {
.map(AbMessage::getUid) .map(AbMessage::getUid)
.collect(Collectors.toList()); .collect(Collectors.toList());
// 设置总数 // 设置总数
taskRecord.setTotal((long) slice.getSize()); count += abMessageList.size();
taskRecord.setTotal((long) count);
// 解析并写入文件 // 解析并写入文件
TaskRecordDto taskRecordDto = taskRecordService.create(taskRecord);
if (taskRecordDto == null){
log.error("============[Write taskRecord fail, please check example info : {} ]============");
return;
}
writeToFile(taskRecord, collect); writeToFile(taskRecord, collect);
} }
if (!slice.hasNext()){ if (!slice.hasNext()){
// 更新发送记录
updateRecord(taskRecord);
break; break;
} }
pageable = slice.nextPageable(); pageable = slice.nextPageable();
@ -206,13 +228,15 @@ public class ProduceLocalFileTask {
OsInfo osInfo = SystemUtil.getOsInfo(); OsInfo osInfo = SystemUtil.getOsInfo();
String fullPath; String fullPath;
if (osInfo.isWindows()){ if (osInfo.isWindows()){
FileUtil.mkdir(windowsFilePath);
// 构建存储文件 // 构建存储文件
fullPath = windowsFilePath + FileUtil.buildOnlyFileNameRule(); fullPath = windowsFilePath + FileUtil.buildOnlyFileNameRule();
}else if (osInfo.isLinux()){ }else if (osInfo.isLinux()){
FileUtil.mkdir(linuxFilePath);
// 构建存储文件 // 构建存储文件
fullPath = linuxFilePath + FileUtil.buildOnlyFileNameRule(); fullPath = linuxFilePath + FileUtil.buildOnlyFileNameRule();
}else if (osInfo.isMac()){ }else if (osInfo.isMac()){
// 构建存储文件 FileUtil.mkdir(macFilePath);
// 构建存储文件 // 构建存储文件
fullPath = macFilePath + FileUtil.buildOnlyFileNameRule(); fullPath = macFilePath + FileUtil.buildOnlyFileNameRule();
}else { }else {
@ -236,25 +260,31 @@ public class ProduceLocalFileTask {
try { try {
if (!FileUtil.exist(fullPath)){ if (!FileUtil.exist(fullPath)){
Files.write(Paths.get(fullPath), collect, StandardOpenOption.CREATE_NEW); Files.write(Paths.get(fullPath), collect, StandardOpenOption.CREATE_NEW);
taskRecord.setIsBuild(SUCESS_BUILD_TAG);
TaskRecord update = taskRecordService.update(taskRecord);
if (update == null){
log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString());
}
}else{ }else{
Files.write(Paths.get(fullPath), collect, StandardOpenOption.APPEND); Files.write(Paths.get(fullPath), collect, StandardOpenOption.APPEND);
taskRecord.setIsBuild(SUCESS_BUILD_TAG);
TaskRecord update = taskRecordService.update(taskRecord);
if (update == null){
log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString());
}
} }
} catch (IOException e) { } catch (IOException e) {
log.error("write prepare send file fail, please check param, fullPath is {}, {}", fullPath, e ); log.error("write prepare send file fail, please check param, fullPath is {}, {}", fullPath, e );
} }
} }
private boolean updateRecord(TaskRecord taskRecord){
// 查询并更新发送记录
TaskRecordQueryCriteria queryCriteria = new TaskRecordQueryCriteria();
queryCriteria.setTaskName(taskRecord.getTaskName());
List<TaskRecordDto> taskRecordDtos = taskRecordService.queryAll(queryCriteria);
if (!CollectionUtils.isEmpty(taskRecordDtos) && taskRecordDtos.size() == 1){
TaskRecordDto taskRecordDto = taskRecordDtos.get(0);
taskRecord.setId(taskRecordDto.getId());
taskRecord.setIsBuild(SUCESS_BUILD_TAG);
TaskRecord update = taskRecordService.update(taskRecord);
if (update == null){
log.error("============[Write taskRecord fail, please check taskRecord info : {} ]============", taskRecord.toString());
return Boolean.FALSE;
}
}
return Boolean.TRUE;
}
} }

@ -50,13 +50,12 @@ public class SendRecordTask {
/** /**
* *
*/ */
@Value(value = "req.db.url") @Value(value = "${req.db.url}")
private String url; private String url;
@Autowired @Autowired
private TaskRecordService taskRecordService; private TaskRecordService taskRecordService;
/** /**
* , * ,
* *
@ -71,6 +70,7 @@ public class SendRecordTask {
log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "SendRecordTask", (endMilliSecond - satrtMilliSecond)); log.info("====== [ task start end, task name is {},cost milliSecond is {} ] ======", "SendRecordTask", (endMilliSecond - satrtMilliSecond));
} }
private void runTask(Integer id, Long limit) { private void runTask(Integer id, Long limit) {
// 准备所有需要发送任务 // 准备所有需要发送任务
TaskRecordDto taskRecordDto = taskRecordService.findById(id); TaskRecordDto taskRecordDto = taskRecordService.findById(id);

@ -115,3 +115,25 @@ file:
req: req:
db: db:
url: # 设置给大坝回传号码的地址 url: # 设置给大坝回传号码的地址
# 线程池的相关配置
produce:
task:
thread_pool:
corePoolSize: 2
maxPoolSize: 16
queueCapacity: 3
ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-'
send:
task:
thread_pool:
corePoolSize: 2
maxPoolSize: 16
queueCapacity: 3
ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-'
merge:
task:
thread_pool:
corePoolSize: 2
maxPoolSize: 16
queueCapacity: 3
ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-'

@ -120,3 +120,29 @@ file:
# 文件大小 /M # 文件大小 /M
maxSize: 100 maxSize: 100
avatarMaxSize: 5 avatarMaxSize: 5
# 配置请求发送路径
req:
db:
url: # 设置给大坝回传号码的地址
# 线程池的相关配置
produce:
task:
thread_pool:
corePoolSize: 2
maxPoolSize: 16
queueCapacity: 3
ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-'
send:
task:
thread_pool:
corePoolSize: 2
maxPoolSize: 16
queueCapacity: 3
ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-'
merge:
task:
thread_pool:
corePoolSize: 2
maxPoolSize: 16
queueCapacity: 3
ThreadNamePrefix: 'ProduceLocalFileTaskExecutor-'

@ -1,6 +1,10 @@
package me.zhengjie; package me.zhengjie;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.abmessage.domain.AbMessage;
import me.zhengjie.modules.abmessage.service.AbMessageService;
import me.zhengjie.modules.abmessage.service.dto.AbMessageQueryCriteria;
import me.zhengjie.modules.mnt.service.dto.AppQueryCriteria;
import me.zhengjie.modules.student.domain.Student; import me.zhengjie.modules.student.domain.Student;
import me.zhengjie.modules.student.repository.StudentRepository; import me.zhengjie.modules.student.repository.StudentRepository;
import me.zhengjie.modules.student.service.StudentService; import me.zhengjie.modules.student.service.StudentService;
@ -17,6 +21,7 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -33,6 +38,9 @@ public class SpringJPATest {
@Autowired @Autowired
private StudentRepository studentRepository; private StudentRepository studentRepository;
@Autowired
private AbMessageService abMessageService;
@Test @Test
public void testQueryBySlice() throws Exception{ public void testQueryBySlice() throws Exception{
@ -66,4 +74,68 @@ public class SpringJPATest {
pageable = slice.nextPageable(); pageable = slice.nextPageable();
} }
} }
@Test
public void testExample(){
Pageable pageable = PageRequest.of(0,200);
AbMessage abMessage = new AbMessage();
abMessage.setActName("编程猫");
Example<AbMessage> example = Example.of(abMessage);
Slice<AbMessage> slice = abMessageService.queryAll(example, pageable);
System.out.println(slice.getContent());
}
@Test
public void testExampleQuery(){
Pageable pageable = PageRequest.of(0,200);
AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria();
abMessageQueryCriteria.setActName("编程");
int temp = 0;
while(true){
Slice<AbMessage> slice = abMessageService.queryAllSlice(abMessageQueryCriteria, pageable);
temp += slice.getContent().size();
if (!slice.hasNext()){
break;
}
pageable = slice.nextPageable();
}
System.out.println(temp);
}
@Test
public void testExampleQuery1(){
Pageable pageable = PageRequest.of(0,200);
AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria();
abMessageQueryCriteria.setActName("编程猫");
Example<AbMessageQueryCriteria> example = Example.of(abMessageQueryCriteria);
Slice<AbMessage> slice = abMessageService.queryAll(example, pageable);
System.out.println(slice.getContent());
}
@Test
public void testExampleQuery2(){
Pageable pageable = PageRequest.of(0,200);
AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria();
abMessageQueryCriteria.setActName("编程猫");
// Example<AbMessageQueryCriteria> example = Example.of(abMessageQueryCriteria);
Slice<AbMessage> slice = abMessageService.queryAllSlice(abMessageQueryCriteria, pageable);
System.out.println(slice.getContent());
}
@Test
public void testExampleQueryPage(){
Pageable pageable = PageRequest.of(0,200);
AbMessageQueryCriteria abMessageQueryCriteria = new AbMessageQueryCriteria();
abMessageQueryCriteria.setActName("编程猫");
int temp = 0;
Map<String, Object> stringObjectMap = abMessageService.queryAll(abMessageQueryCriteria, pageable);
System.out.println(stringObjectMap.toString());
}
} }

@ -1,6 +1,7 @@
package me.zhengjie; package me.zhengjie;
import com.google.common.collect.Lists; import com.alibaba.fastjson.JSON;
import me.zhengjie.modules.abmessage.domain.AbMessage;
import me.zhengjie.utils.FileUtil; import me.zhengjie.utils.FileUtil;
import org.junit.Test; import org.junit.Test;
@ -49,4 +50,10 @@ public class TempTest {
} }
} }
@Test
public void testJsonParse(){
String json = "{\"actName\":\"编程猫\"}";
System.out.println(JSON.parseObject(json, AbMessage.class));
}
} }

Loading…
Cancel
Save