From dd907c021a36b59054e2c3068979d5654417d431 Mon Sep 17 00:00:00 2001 From: wujingtao Date: Tue, 30 Nov 2021 18:34:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0api=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/example/controller/JobController.java | 67 +++++ .../src/main/java/com/example/entity/Job.java | 128 ++++++++++ .../java/com/example/entity/JobBriefInfo.java | 48 ++++ .../java/com/example/serive/JobService.java | 42 ++++ .../example/serive/impl/JobServiceImpl.java | 232 ++++++++++++++++++ .../src/main/java/com/example/Demo2Job.java | 17 ++ .../src/main/java/com/example/DemoJob.java | 2 +- .../src/main/resources/application.yml | 4 +- pom.xml | 10 +- 9 files changed, 542 insertions(+), 8 deletions(-) create mode 100644 elastic-job-springboot-core/src/main/java/com/example/controller/JobController.java create mode 100644 elastic-job-springboot-core/src/main/java/com/example/entity/Job.java create mode 100644 elastic-job-springboot-core/src/main/java/com/example/entity/JobBriefInfo.java create mode 100644 elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java create mode 100644 elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java create mode 100644 elastic-job-springboot-test/src/main/java/com/example/Demo2Job.java rename {elastic-job-springboot-core => elastic-job-springboot-test}/src/main/resources/application.yml (61%) diff --git a/elastic-job-springboot-core/src/main/java/com/example/controller/JobController.java b/elastic-job-springboot-core/src/main/java/com/example/controller/JobController.java new file mode 100644 index 0000000..36e9526 --- /dev/null +++ b/elastic-job-springboot-core/src/main/java/com/example/controller/JobController.java @@ -0,0 +1,67 @@ +package com.example.controller; + +import com.example.entity.Job; +import com.example.entity.JobBriefInfo; +import com.example.serive.JobService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.util.List; + +/** + * @author wjt + * @date 2021/11/26 + */ +@Slf4j +@RestController +public class JobController { + @Resource + private JobService jobService; + + /** + * 注册新任务 + * + * @param job + */ + @PostMapping("/add/job") + public void addJob(@RequestBody Job job) { + log.info("++++++++++++++++++++++++++++++++++++++++++++++++注册定时任务:{}", job.toString()); + jobService.addJob(job); + } + + /** + * 移除任务 + * + * @param jobName + */ + @GetMapping("/remove/job") + public void remove(String jobName) { + log.info("=+++++++++++++++++++++++++++++++++++++++++++++++删除定时任务:{}", jobName); + jobService.removeJob(jobName); + } + + /** + * 获取任务信息 + * + * @param jobName + * @return + */ + @GetMapping("/get/job") + public Job getJobDetails(String jobName) { + return jobService.getJobDetail(jobName); + } + + /** + * 获取所有任务的信息 + * + * @return + */ + @GetMapping("/get/all") + public List getAllJobsDetails() { + return jobService.getAllJobsDetails(); + } +} diff --git a/elastic-job-springboot-core/src/main/java/com/example/entity/Job.java b/elastic-job-springboot-core/src/main/java/com/example/entity/Job.java new file mode 100644 index 0000000..6951072 --- /dev/null +++ b/elastic-job-springboot-core/src/main/java/com/example/entity/Job.java @@ -0,0 +1,128 @@ +package com.example.entity; + +import com.sun.istack.internal.NotNull; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wjt + * @date 2021/11/26 + */ +@Data +public class Job implements Serializable { + private static final long serialVersionUID = -7135891833447229851L; + + public Job() { + } + + /** + * 任务名称 + */ + @NotNull + private String jobName; + /** + * 任务类型 + */ + private String jobType; + /** + * 任务实例类路径 + */ + @NotNull + + private String jobClass; + /** + * 执行时间表达式 + */ + @NotNull + private String cron; + /** + * 总分片数 + */ + private int shardingTotalCount = 1; + private Boolean overwrite = true; + /** + * 分片序列号/参数对照 多个键值对用逗号分隔 + * 分片序列号从0开始,不可大于或等于作业分片总数 + * 如0=a,1=b,2=c + */ + private String shardingItemParameters=""; + /** + * 自定义的参数,可通过传递该参数为作业调度的业务方法传参 + */ + private String jobParameter=""; + /** + * 监控作业运行时状态 + * 每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取 + */ + private boolean monitorExecution = true; + /** + * 是否流式处理数据 + * 如果非流式处理数据, 则处理数据完成后作业结束 + */ + private boolean streamingProcess = false; + /** + * 最大允许的本机与注册中心的时间误差秒数 + * >配置为-1表示不校验时间误差 + */ + private int maxTimeDiffSeconds = -1; + /** + * 作业监控端口 + */ + private int monitorPort = -1; + /** + * 是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行 + */ + private boolean failover = false; + /** + * 是否开启错过任务重新执行 + */ + private boolean misfire = false; + /** + * 作业分片策略实现类全路径,默认使用平均分配策略 + */ + private String jobShardingStrategyClass = ""; + /** + * 作业描述 + */ + private String description=""; + /** + * 扩展属性 + */ + private JobProperties jobProperties = new JobProperties(); + /** + * 脚本型作业执行命令行 + */ + private String scriptCommandLine=""; + /** + * 修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复,单位:分钟 + */ + private int reconcileIntervalMinutes = 10; + /** + * 作业是否禁止启动,可用于部署作业时,先禁止启动,部署结束后统一启动 + */ + private boolean disabled = false; + /** + * 前置后置任务监听实现类,需实现ElasticJobListener接口 + * @return + */ + private String listener = ""; + /** + * 前置后置任务分布式监听实现类,需继承AbstractDistributeOnceElasticJobListener类 + * @return + */ + private String distributedListener = ""; + + /** + * 最后一个作业执行前的执行方法的超时时间,单位:毫秒 + * @return + */ + private long startedTimeoutMilliseconds = Long.MAX_VALUE; + + /** + * 最后一个作业执行后的执行方法的超时时间,单位:毫秒 + * @return + */ + private long completedTimeoutMilliseconds = Long.MAX_VALUE; + +} diff --git a/elastic-job-springboot-core/src/main/java/com/example/entity/JobBriefInfo.java b/elastic-job-springboot-core/src/main/java/com/example/entity/JobBriefInfo.java new file mode 100644 index 0000000..18b51b5 --- /dev/null +++ b/elastic-job-springboot-core/src/main/java/com/example/entity/JobBriefInfo.java @@ -0,0 +1,48 @@ +package com.example.entity; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wjt + * @date 2021/11/30 + */ +@Data +public class JobBriefInfo implements Serializable,Comparable { + private static final long serialVersionUID = -2150981397641113926L; + + private String jobName; + private JobBriefInfo.JobStatus status; + private String description; + private String cron; + private int instanceCount; + private int shardingTotalCount; + + @Override + public int compareTo(JobBriefInfo o) { + return this.getJobName().compareTo(o.getJobName()); + } + + public static enum JobStatus { + /** + * 正常 + */ + OK, + /** + * 下线 + */ + CRASHED, + /** + * 失效 + */ + DISABLED, + /** + * 分片中 + */ + SHARDING_FLAG; + + private JobStatus() { + } + } +} diff --git a/elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java b/elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java new file mode 100644 index 0000000..e8157f9 --- /dev/null +++ b/elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java @@ -0,0 +1,42 @@ +package com.example.serive; + +import com.example.entity.Job; +import com.example.entity.JobBriefInfo; + +import java.util.List; + +/** + * @author wjt + * @date 2021/11/26 + */ +public interface JobService { + + /** + * 注册job + * + * @param job + */ + void addJob(Job job); + + /** + * 删除任务 + * + * @param jobName + */ + void removeJob(String jobName); + + /** + * 查询job信息 + * + * @param jobName + * @return + */ + Job getJobDetail(String jobName); + + /** + * 查询所有的任务基本信息 + * + * @return + */ + List getAllJobsDetails(); +} diff --git a/elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java b/elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java new file mode 100644 index 0000000..b95ed42 --- /dev/null +++ b/elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java @@ -0,0 +1,232 @@ +package com.example.serive.impl; + +import com.dangdang.ddframe.job.config.JobCoreConfiguration; +import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; +import com.dangdang.ddframe.job.executor.handler.JobProperties; +import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; +import com.dangdang.ddframe.job.lite.internal.config.LiteJobConfigurationGsonFactory; +import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath; +import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; +import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; +import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; +import com.example.entity.Job; +import com.example.entity.JobBriefInfo; +import com.example.serive.JobService; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.beans.factory.support.ManagedList; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.util.*; + +/** + * @author wjt + * @date 2021/11/26 + */ +@Slf4j +@Service +public class JobServiceImpl implements JobService { + @Resource + private CoordinatorRegistryCenter center; + @Resource + private ZookeeperRegistryCenter zookeeperRegistryCenter; + // @Resource +// private DataSource dataSource; + @Resource + private ApplicationContext ctx; + + @Override + public void addJob(Job job) { + boolean overwrite = job.getOverwrite(); + JobCoreConfiguration jcc = JobCoreConfiguration + .newBuilder(job.getJobName(), job.getCron(), job.getShardingTotalCount()) + .shardingItemParameters(job.getShardingItemParameters()) + .description(job.getDescription()) + .failover(job.isFailover()) + .jobParameter(job.getJobParameter()) + .misfire(job.isMisfire()) + .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobProperties().getJobExceptionHandler()) + .jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getJobProperties().getExecutorServiceHandler()) + .build(); + SimpleJobConfiguration sjc = new SimpleJobConfiguration( + jcc, job.getJobClass() + ); + LiteJobConfiguration ljc = LiteJobConfiguration + .newBuilder(sjc) + .overwrite(overwrite) + .disabled(job.isDisabled()) + .monitorPort(job.getMonitorPort()) + .monitorExecution(job.isMonitorExecution()) + .maxTimeDiffSeconds(job.getMaxTimeDiffSeconds()) + .jobShardingStrategyClass(job.getJobShardingStrategyClass()) + .reconcileIntervalMinutes(job.getReconcileIntervalMinutes()) + .build(); + // 配置数据源 +// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource); + //配置监听 +// MyNormalListener myNormalListener = new MyNormalListener(); + List elasticJobListeners = getTargetElasticJobListeners(job); + + // 构建SpringJobScheduler对象来初始化任务 + BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class); + factory.setScope(BeanDefinition.SCOPE_PROTOTYPE); +// 脚步执行类型 +// factory.addConstructorArgValue(null); +// factory.addConstructorArgValue(jec); + BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(job.getJobClass()); + factory.addConstructorArgValue(rdbFactory.getBeanDefinition()); + factory.addConstructorArgValue(zookeeperRegistryCenter); + factory.addConstructorArgValue(ljc); + + factory.addConstructorArgValue(elasticJobListeners); + DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) ctx.getAutowireCapableBeanFactory(); + defaultListableBeanFactory.registerBeanDefinition("SpringJobScheduler" + job.getJobName(), factory.getBeanDefinition()); + SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean("SpringJobScheduler" + job.getJobName()); + springJobScheduler.init(); + log.info("【" + job.getJobName() + "】\t" + job.getJobClass() + "\t 注册成功"); + } + + @Override + public void removeJob(String jobName) { + try { + CuratorFramework client = zookeeperRegistryCenter.getClient(); + client.delete().deletingChildrenIfNeeded().forPath("/" + jobName); + } catch (Exception e) { + log.error("删除任务:{} 错误 {}", jobName, e.getMessage()); + } + } + + /** + * 获取任务的详情 + */ + @Override + public Job getJobDetail(String jobName) { + Job job = new Job(); + try { + JobNodePath jobNodePath = new JobNodePath(jobName); + LiteJobConfiguration liteJobConfig = LiteJobConfigurationGsonFactory.fromJson(center.get(jobNodePath.getConfigNodePath())); + job.setJobType(liteJobConfig.getTypeConfig().getJobType().name()); + this.buildSimpleJobSettings(jobName, job, liteJobConfig); + } catch (Exception e) { + log.error("查询job任务:{} 错误 {}", jobName, e.getMessage()); + } + return job; + } + + @Override + public List getAllJobsDetails() { + List jobNames = zookeeperRegistryCenter.getChildrenKeys("/"); + List result = new ArrayList<>(jobNames.size()); + + for (String each : jobNames) { + JobBriefInfo jobBriefInfo = this.getJobBriefInfo(each); + if (null != jobBriefInfo) { + result.add(jobBriefInfo); + } + } + Collections.sort(result); + return result; + } + + private List getTargetElasticJobListeners(Job job) { + List result = new ManagedList<>(2); + String listeners = job.getListener(); + if (StringUtils.hasText(listeners)) { + BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners); + factory.setScope(BeanDefinition.SCOPE_PROTOTYPE); + result.add(factory.getBeanDefinition()); + } + + String distributedListeners = job.getDistributedListener(); + long startedTimeoutMilliseconds = job.getStartedTimeoutMilliseconds(); + long completedTimeoutMilliseconds = job.getCompletedTimeoutMilliseconds(); + + if (StringUtils.hasText(distributedListeners)) { + BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners); + factory.setScope(BeanDefinition.SCOPE_PROTOTYPE); + factory.addConstructorArgValue(startedTimeoutMilliseconds); + factory.addConstructorArgValue(completedTimeoutMilliseconds); + result.add(factory.getBeanDefinition()); + } + return result; + } + + private void buildSimpleJobSettings(String jobName, Job job, LiteJobConfiguration liteJobConfig) { + job.setJobName(jobName); + job.setJobType(liteJobConfig.getTypeConfig().getJobType().name()); + job.setJobClass(liteJobConfig.getTypeConfig().getJobClass()); + job.setShardingTotalCount(liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount()); + job.setCron(liteJobConfig.getTypeConfig().getCoreConfig().getCron()); + job.setShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()); + job.setJobParameter(liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter()); + job.setMonitorExecution(liteJobConfig.isMonitorExecution()); + job.setMaxTimeDiffSeconds(liteJobConfig.getMaxTimeDiffSeconds()); + job.setMonitorPort(liteJobConfig.getMonitorPort()); + job.setFailover(liteJobConfig.getTypeConfig().getCoreConfig().isFailover()); + job.setMisfire(liteJobConfig.getTypeConfig().getCoreConfig().isMisfire()); + job.setJobShardingStrategyClass(liteJobConfig.getJobShardingStrategyClass()); + job.setDescription(liteJobConfig.getTypeConfig().getCoreConfig().getDescription()); + job.setReconcileIntervalMinutes(liteJobConfig.getReconcileIntervalMinutes()); + job.setOverwrite(liteJobConfig.isOverwrite()); + } + + public JobBriefInfo getJobBriefInfo(String jobName) { + JobNodePath jobNodePath = new JobNodePath(jobName); + JobBriefInfo result = new JobBriefInfo(); + result.setJobName(jobName); + String liteJobConfigJson = center.get(jobNodePath.getConfigNodePath()); + if (null == liteJobConfigJson) { + return null; + } else { + LiteJobConfiguration liteJobConfig = LiteJobConfigurationGsonFactory.fromJson(liteJobConfigJson); + result.setDescription(liteJobConfig.getTypeConfig().getCoreConfig().getDescription()); + result.setCron(liteJobConfig.getTypeConfig().getCoreConfig().getCron()); + result.setInstanceCount(center.getChildrenKeys((new JobNodePath(jobName)).getInstancesNodePath()).size()); + result.setShardingTotalCount(liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount()); + result.setStatus(this.getJobStatus(jobName)); + return result; + } + } + + private JobBriefInfo.JobStatus getJobStatus(String jobName) { + JobNodePath jobNodePath = new JobNodePath(jobName); + List instances = center.getChildrenKeys(jobNodePath.getInstancesNodePath()); + if (instances.isEmpty()) { + return JobBriefInfo.JobStatus.CRASHED; + } else if (this.isAllDisabled(jobNodePath)) { + return JobBriefInfo.JobStatus.DISABLED; + } else { + return this.isHasShardingFlag(jobNodePath, instances) ? JobBriefInfo.JobStatus.SHARDING_FLAG : JobBriefInfo.JobStatus.OK; + } + } + + private boolean isAllDisabled(JobNodePath jobNodePath) { + List serversPath = zookeeperRegistryCenter.getChildrenKeys(jobNodePath.getServerNodePath()); + int disabledServerCount = 0; + for (String each : serversPath) { + if (JobBriefInfo.JobStatus.DISABLED.name().equals(center.get(jobNodePath.getServerNodePath(each)))) { + ++disabledServerCount; + } + } + + return disabledServerCount == serversPath.size(); + } + + private boolean isHasShardingFlag(JobNodePath jobNodePath, List instances) { + Set shardingInstances = new HashSet<>(); + for (String each : center.getChildrenKeys(jobNodePath.getShardingNodePath())) { + String instanceId = center.get(jobNodePath.getShardingNodePath(each, "instance")); + if (null != instanceId && !instanceId.isEmpty()) { + shardingInstances.add(instanceId); + } + } + + return !instances.containsAll(shardingInstances) || shardingInstances.isEmpty(); + } +} diff --git a/elastic-job-springboot-test/src/main/java/com/example/Demo2Job.java b/elastic-job-springboot-test/src/main/java/com/example/Demo2Job.java new file mode 100644 index 0000000..4c84dbc --- /dev/null +++ b/elastic-job-springboot-test/src/main/java/com/example/Demo2Job.java @@ -0,0 +1,17 @@ +package com.example; + +import com.dangdang.ddframe.job.api.ShardingContext; +import com.dangdang.ddframe.job.api.simple.SimpleJob; +import lombok.extern.slf4j.Slf4j; + +/** + * @author wjt + * @date 2021/11/26 + */ +@Slf4j +public class Demo2Job implements SimpleJob { + @Override + public void execute(ShardingContext shardingContext) { + log.info("注册成功"); + } +} diff --git a/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java b/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java index b1fa6d3..4cc0bbd 100644 --- a/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java +++ b/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java @@ -15,7 +15,7 @@ import java.time.format.DateTimeFormatter; */ @Slf4j @ElasticSimpleJob( - jobName = "DemoJob", cron = "0 0/1 * * * ?", shardingTotalCount = 1, overwrite = true, jobEvent = false) + jobName = "DemoJob", cron = "0/30 * * * * ?", shardingTotalCount = 1, overwrite = true, jobEvent = false) @Component public class DemoJob implements SimpleJob { @Override diff --git a/elastic-job-springboot-core/src/main/resources/application.yml b/elastic-job-springboot-test/src/main/resources/application.yml similarity index 61% rename from elastic-job-springboot-core/src/main/resources/application.yml rename to elastic-job-springboot-test/src/main/resources/application.yml index e7ee043..dcc92cc 100644 --- a/elastic-job-springboot-core/src/main/resources/application.yml +++ b/elastic-job-springboot-test/src/main/resources/application.yml @@ -1,7 +1,7 @@ server: - port: 8081 + port: 8082 elasticjob: zookeeper: namespace: springboot-elasticjob - server-list: localhost:2181 + server-list: localhost:2181 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9662969..2df7453 100644 --- a/pom.xml +++ b/pom.xml @@ -45,11 +45,11 @@ elastic-job-lite-core 2.1.5 - - - - - + + com.dangdang + elastic-job-lite-spring + 2.1.5 + ${project.name}