From 3626c8e0ebe17c9b508afad5033ac456ddf1ef2d Mon Sep 17 00:00:00 2001 From: wjt Date: Sat, 17 Jun 2023 13:37:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=8E=8B=E7=BC=A9=E6=8A=95?= =?UTF-8?q?=E6=94=BE=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../TbBDNewBackdataEntityRepository.java | 3 + .../TbGdtNewBackdataEntityRepository.java | 3 + .../TbJLV1BackdataEntityRepository.java | 3 + .../TbJLV2BackdataEntityRepository.java | 3 + .../TbKSNewBackdataEntityRepository.java | 3 + .../repository/entity/TbTfSourceEntity.java | 2 +- .../adcallback/task/TfSourceTaskNew.java | 227 ++++++++++++++++++ 7 files changed, 243 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/baiyee/adcallback/task/TfSourceTaskNew.java diff --git a/src/main/java/com/baiyee/adcallback/repository/TbBDNewBackdataEntityRepository.java b/src/main/java/com/baiyee/adcallback/repository/TbBDNewBackdataEntityRepository.java index 08df7aa..750ce66 100644 --- a/src/main/java/com/baiyee/adcallback/repository/TbBDNewBackdataEntityRepository.java +++ b/src/main/java/com/baiyee/adcallback/repository/TbBDNewBackdataEntityRepository.java @@ -30,4 +30,7 @@ public interface TbBDNewBackdataEntityRepository extends BaseRepository ?1 AND t.gmtCreate < ?2") List queryByDate(DateTime startDate, DateTime endDate); + + @Query("SELECT count(t) FROM TbJLBackDataV1Entity t WHERE t.imei != '__IMEI__' and t.gmtCreate > ?1 AND t.gmtCreate < ?2") + Integer countByGmtCreate(DateTime startDate, DateTime endDate); } diff --git a/src/main/java/com/baiyee/adcallback/repository/TbGdtNewBackdataEntityRepository.java b/src/main/java/com/baiyee/adcallback/repository/TbGdtNewBackdataEntityRepository.java index ea0925a..78306fb 100644 --- a/src/main/java/com/baiyee/adcallback/repository/TbGdtNewBackdataEntityRepository.java +++ b/src/main/java/com/baiyee/adcallback/repository/TbGdtNewBackdataEntityRepository.java @@ -31,4 +31,7 @@ public interface TbGdtNewBackdataEntityRepository extends BaseRepository ?1 AND t.gmtCreate < ?2") List queryByDate(DateTime startDate, DateTime endDate); + + @Query("SELECT count(t) FROM TbJLBackDataV1Entity t WHERE t.imei != '__IMEI__' and t.gmtCreate > ?1 AND t.gmtCreate < ?2") + Integer countByGmtCreate(DateTime startDate, DateTime endDate); } diff --git a/src/main/java/com/baiyee/adcallback/repository/TbJLV1BackdataEntityRepository.java b/src/main/java/com/baiyee/adcallback/repository/TbJLV1BackdataEntityRepository.java index 13e831a..11d26a8 100644 --- a/src/main/java/com/baiyee/adcallback/repository/TbJLV1BackdataEntityRepository.java +++ b/src/main/java/com/baiyee/adcallback/repository/TbJLV1BackdataEntityRepository.java @@ -39,6 +39,9 @@ public interface TbJLV1BackdataEntityRepository extends BaseRepository ?1 AND t.gmtCreate < ?2") List queryByDate(DateTime startDate, DateTime endDate); + @Query("SELECT count(t) FROM TbJLBackDataV1Entity t WHERE t.imei != '__IMEI__' and t.gmtCreate > ?1 AND t.gmtCreate < ?2") + Integer countByGmtCreate(DateTime startDate, DateTime endDate); + /** * 测试 * diff --git a/src/main/java/com/baiyee/adcallback/repository/TbJLV2BackdataEntityRepository.java b/src/main/java/com/baiyee/adcallback/repository/TbJLV2BackdataEntityRepository.java index 3720f2e..2aeedd2 100644 --- a/src/main/java/com/baiyee/adcallback/repository/TbJLV2BackdataEntityRepository.java +++ b/src/main/java/com/baiyee/adcallback/repository/TbJLV2BackdataEntityRepository.java @@ -23,6 +23,9 @@ public interface TbJLV2BackdataEntityRepository extends BaseRepository ?1 AND t.gmtCreate < ?2") List queryByDate(DateTime startDate, DateTime endDate); + @Query("SELECT count(t) FROM TbJLBackDataV1Entity t WHERE t.imei != '__IMEI__' and t.gmtCreate > ?1 AND t.gmtCreate < ?2") + Integer countByGmtCreate(DateTime startDate, DateTime endDate); + /** * 查询数据 */ diff --git a/src/main/java/com/baiyee/adcallback/repository/TbKSNewBackdataEntityRepository.java b/src/main/java/com/baiyee/adcallback/repository/TbKSNewBackdataEntityRepository.java index 53acb10..cc94739 100644 --- a/src/main/java/com/baiyee/adcallback/repository/TbKSNewBackdataEntityRepository.java +++ b/src/main/java/com/baiyee/adcallback/repository/TbKSNewBackdataEntityRepository.java @@ -30,4 +30,7 @@ public interface TbKSNewBackdataEntityRepository extends BaseRepository ?1 AND t.gmtCreate < ?2") List queryByDate(DateTime startDate, DateTime endDate); + + @Query("SELECT count(t) FROM TbJLBackDataV1Entity t WHERE t.imei != '__IMEI__' and t.gmtCreate > ?1 AND t.gmtCreate < ?2") + Integer countByGmtCreate(DateTime startDate, DateTime endDate); } diff --git a/src/main/java/com/baiyee/adcallback/repository/entity/TbTfSourceEntity.java b/src/main/java/com/baiyee/adcallback/repository/entity/TbTfSourceEntity.java index 82bf6e1..2a49b57 100644 --- a/src/main/java/com/baiyee/adcallback/repository/entity/TbTfSourceEntity.java +++ b/src/main/java/com/baiyee/adcallback/repository/entity/TbTfSourceEntity.java @@ -82,7 +82,7 @@ public class TbTfSourceEntity implements Serializable { this.setFileName(fileName); this.setFilePath(filePath); this.setFilePwd(filePwd); - this.setFileStatus(2); + this.setFileStatus(0); this.setWeight(0); return this; } diff --git a/src/main/java/com/baiyee/adcallback/task/TfSourceTaskNew.java b/src/main/java/com/baiyee/adcallback/task/TfSourceTaskNew.java new file mode 100644 index 0000000..adf7a42 --- /dev/null +++ b/src/main/java/com/baiyee/adcallback/task/TfSourceTaskNew.java @@ -0,0 +1,227 @@ +package com.baiyee.adcallback.task; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.text.csv.CsvWriter; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; +import com.baiyee.adcallback.api.dto.TfSourceDTO; +import com.baiyee.adcallback.common.enums.TableEnum; +import com.baiyee.adcallback.common.util.FileUtil; +import com.baiyee.adcallback.repository.*; +import com.baiyee.adcallback.repository.entity.TbTfSourceEntity; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.lingala.zip4j.core.ZipFile; +import net.lingala.zip4j.exception.ZipException; +import net.lingala.zip4j.model.ZipParameters; +import net.lingala.zip4j.util.Zip4jConstants; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; +import javax.persistence.Query; +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; + +/** + * @author wjt + * @date 2023/6/30 + *

+ * 将每天产生得数据压缩成文件 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class TfSourceTaskNew { + private final TbJLV2BackdataEntityRepository tbJLV2BackdataEntityRepository; + private final TbJLV1BackdataEntityRepository tbJLV1BackdataEntityRepository; + private final TbGdtNewBackdataEntityRepository tbGdtNewBackdataEntityRepository; + private final TbKSNewBackdataEntityRepository tbKSNewBackdataEntityRepository; + private final TbBDNewBackdataEntityRepository tbBDNewBackdataEntityRepository; + private final TbTfSourceEntityRepository tbTfSourceEntityRepository; + @PersistenceContext + private EntityManager entityManager; + @Value("${tf.source.path}") + private String path; + + @Scheduled(cron = "0 0 1 * * ? ") + @Transactional(rollbackFor = Exception.class) + public void compressSourceJlV1() { + log.info("++++++++++++++++++++++source jlv1 compress start time {} ++++++++++++++++", DateUtil.now()); + DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday()); + DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday()); + Integer total = tbJLV1BackdataEntityRepository.countByGmtCreate(startTime, endTime); + saveData(total, startTime, endTime, TableEnum.JL1.getTableName()); + log.info("++++++++++++++++++++++TfSource jlv1 Sync end time {} ++++++++++++++++", DateUtil.now()); + } + + @Scheduled(cron = "0 0 2 * * ? ") + @Transactional(rollbackFor = Exception.class) + public void compressSourceJlV2() { + log.info("++++++++++++++++++++++source jlv2 compress start time {} ++++++++++++++++", DateUtil.now()); + DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday()); + DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday()); + Integer total = tbJLV2BackdataEntityRepository.countByGmtCreate(startTime, endTime); + saveData(total, startTime, endTime, TableEnum.JL2.getTableName()); + log.info("++++++++++++++++++++++TfSource jlv2 Sync end time {} ++++++++++++++++", DateUtil.now()); + } + + @Scheduled(cron = "0 0 3 * * ? ") + @Transactional(rollbackFor = Exception.class) + public void compressSourceGdt() { + log.info("++++++++++++++++++++++source gdt compress start time {} ++++++++++++++++", DateUtil.now()); + DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday()); + DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday()); + Integer total = tbGdtNewBackdataEntityRepository.countByGmtCreate(startTime, endTime); + saveData(total, startTime, endTime, TableEnum.GDT_NEW.getTableName()); + log.info("++++++++++++++++++++++TfSource gdt Sync end time {} ++++++++++++++++", DateUtil.now()); + } + + @Scheduled(cron = "0 0 4 * * ? ") + @Transactional(rollbackFor = Exception.class) + public void compressSourceKs() { + log.info("++++++++++++++++++++++source ks compress start time {} ++++++++++++++++", DateUtil.now()); + DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday()); + DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday()); + Integer total = tbKSNewBackdataEntityRepository.countByGmtCreate(startTime, endTime); + saveData(total, startTime, endTime, TableEnum.KS_NEW.getTableName()); + log.info("++++++++++++++++++++++TfSource ks Sync end time {} ++++++++++++++++", DateUtil.now()); + } + + @Scheduled(cron = "0 0 5 * * ? ") + @Transactional(rollbackFor = Exception.class) + public void compressSourceBd() { + log.info("++++++++++++++++++++++source bd compress start time {} ++++++++++++++++", DateUtil.now()); + DateTime startTime = DateUtil.beginOfDay(DateUtil.yesterday()); + DateTime endTime = DateUtil.endOfDay(DateUtil.yesterday()); + Integer total = tbBDNewBackdataEntityRepository.countByGmtCreate(startTime, endTime); + saveData(total, startTime, endTime, TableEnum.BD_NEW.getTableName()); + log.info("++++++++++++++++++++++TfSource bd Sync end time {} ++++++++++++++++", DateUtil.now()); + } + + + /** + * 压缩记录入库,每次压缩5万 + */ + private void saveData(Integer total, DateTime startTime, DateTime endTime, String dataSourceName) { + if (total <= 0) { + return; + } + int startNum = 0; + int num = 100000; + int circulateNum = 0; + Map> map = new HashMap<>(16); + while (startNum < total) { + List tfSourceDTOS = queryData(dataSourceName, startTime, endTime, startNum, num); + if (CollUtil.isEmpty(tfSourceDTOS)) { + return; + } + //生成临时文件 + generateFile(tfSourceDTOS, map, circulateNum); + circulateNum += 1; + startNum += num; + } + String format = DateUtil.format(DateUtil.yesterday(), "yyyy-MM-dd"); + String filePath = path + File.separator + format; + List saveList = new ArrayList<>(); + + for (Map.Entry> entry : map.entrySet()) { + String password = RandomUtil.randomString(6); + String newPath = zipFile(filePath + File.separator + entry.getKey() + ".zip", entry.getValue(), password); + if (StrUtil.isNotBlank(filePath)) { + //保存文件记录 + TbTfSourceEntity tbTfSourceEntity = new TbTfSourceEntity().addTbTfSourceEntity(format, null, null, entry.getKey(), newPath, password); + saveList.add(tbTfSourceEntity); + } + } + tbTfSourceEntityRepository.saveAll(saveList); + } + + + public void generateFile(List list, Map> map, int circulateNum) { + //tag分组 + HashMap> mapByTag = new HashMap<>(list.stream().collect(Collectors.groupingBy(TfSourceDTO::getTag, Collectors.toList()))); + String format = DateUtil.format(DateUtil.yesterday(), "yyyy-MM-dd"); + //生成文件 + for (String tag : mapByTag.keySet()) { + String filePath = path + File.separator + format + File.separator + tag + circulateNum + ".csv"; + File file = new File(filePath); + CsvWriter writer = new CsvWriter(file, StandardCharsets.UTF_8); + writer.writeBeans(mapByTag.get(tag)); + writer.flush(); + writer.close(); + List mapValueList; + if (map.containsKey(tag)) { + mapValueList = map.get(tag); + } else { + mapValueList = new ArrayList<>(); + } + mapValueList.add(filePath); + map.put(tag, mapValueList); + } + } + + /** + * 生成加密压缩包 + * + * @param newPath 新地址 + * @param srcPath 临时地址 + * @param password 密码 + */ + private String zipFile(String newPath, List srcPath, String password) { + ZipParameters zipParameters = new ZipParameters(); + zipParameters.setEncryptionMethod(Zip4jConstants.COMP_DEFLATE); + zipParameters.setCompressionLevel(Zip4jConstants.DEFLATE_LEVEL_NORMAL); + if (StrUtil.isNotBlank(password)) { + zipParameters.setEncryptFiles(true); + zipParameters.setEncryptionMethod(Zip4jConstants.ENC_METHOD_STANDARD); + zipParameters.setPassword(password.toCharArray()); + } + try { + ZipFile zipFile = new ZipFile(newPath); + ArrayList files = new ArrayList(); + srcPath.forEach(path -> { + files.add(new File(path)); + }); + zipFile.addFiles(files, zipParameters); + for (String path : srcPath) { + FileUtil.del(path); + } + return newPath; + } catch (ZipException e) { + e.printStackTrace(); + } + return null; + } + + private List queryData(String tableName, DateTime startTime, DateTime endTime, Integer startNum, Integer endNum) { + String sql = "select imei,idfa,mac,oaid,tag,vpoint FROM " + tableName + + " where imei !='__IMEI__' and gmt_create > :startTime and gmt_create < :endTime ORDER BY id desc LIMIT :startNum , :endNum"; + Query selectQuery = entityManager.createNativeQuery(sql); + selectQuery.setParameter("startTime", DateUtil.format(startTime, "yyyy-MM-dd HH:mm:ss")); + selectQuery.setParameter("endTime", DateUtil.format(endTime, "yyyy-MM-dd HH:mm:ss")); + selectQuery.setParameter("startNum", startNum); + selectQuery.setParameter("endNum", endNum); + List list = selectQuery.getResultList(); + List tfSourceDTOS = new ArrayList<>(); + for (Object row : list) { + Object[] cells = (Object[]) row; + TfSourceDTO source = new TfSourceDTO(); + source.setImei((String) cells[0]); + source.setIdfa((String) cells[1]); + source.setMac((String) cells[2]); + source.setOaid((String) cells[3]); + source.setTag((String) cells[4]); + source.setVpoint((Integer) cells[5]); + tfSourceDTOS.add(source); + } + return tfSourceDTOS; + } +}