diff --git a/elastic-job-springboot-core/src/main/java/com/example/config/SimpleJobAutoConfig.java b/elastic-job-springboot-core/src/main/java/com/example/config/SimpleJobAutoConfig.java index 94d6deb..7b0e90b 100644 --- a/elastic-job-springboot-core/src/main/java/com/example/config/SimpleJobAutoConfig.java +++ b/elastic-job-springboot-core/src/main/java/com/example/config/SimpleJobAutoConfig.java @@ -3,9 +3,9 @@ package com.example.config; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; -import com.dangdang.ddframe.job.lite.api.JobScheduler; -import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; +import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.example.jobInstance.ElasticSimpleJob; import lombok.extern.slf4j.Slf4j; @@ -13,13 +13,16 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Configuration; +import org.springframework.util.StringUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; +import javax.sql.DataSource; import java.util.Map; /** - * @author q + * @author wjt + * @date 2023/7/19 */ @Slf4j @Configuration @@ -32,43 +35,58 @@ public class SimpleJobAutoConfig { @Resource private ApplicationContext applicationContext; + // @Resource // private DataSource dataSource; + @PostConstruct public void initSimpleJob() { - Map beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class); - beans.forEach((key, instance) -> { - Class[] interfaces = instance.getClass().getInterfaces(); - for (Class anInterface : interfaces) { - if (anInterface == SimpleJob.class) { - ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class); - String jobName = annotation.jobName(); - String cron = annotation.cron(); - int shardingTotalCount = annotation.shardingTotalCount(); - boolean overwrite = annotation.overwrite(); - Class jobStrategy = annotation.jobStrategy(); - boolean jobEvent = annotation.jobEvent(); - Class[] jobListeners = annotation.jobListeners(); - JobCoreConfiguration jcc = JobCoreConfiguration - .newBuilder(jobName, cron, shardingTotalCount) - .build(); - SimpleJobConfiguration sjc = new SimpleJobConfiguration( - jcc, - instance.getClass().getCanonicalName() - ); - LiteJobConfiguration ljc = LiteJobConfiguration - .newBuilder(sjc) - .jobShardingStrategyClass(jobStrategy.getCanonicalName()) - .overwrite(overwrite) - .build(); - // 配置数据源 -// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource); - //配置监听 -// MyNormalListener myNormalListener = new MyNormalListener(); - new JobScheduler(center, ljc).init(); - } + Map map = applicationContext.getBeansOfType(SimpleJob.class); + for (Map.Entry entry : map.entrySet()) { + //任务类 + SimpleJob simpleJob = entry.getValue(); + //注解实体 + ElasticSimpleJob annotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class); + if (annotation == null) { + continue; } - }); + + String jobName = annotation.jobName(); + String cron = annotation.cron(); + int shardingTotalCount = annotation.shardingTotalCount(); + //是否刷新zk + boolean overwrite = annotation.overwrite(); + //分片参数 + String parameters = annotation.shardingItemParameters(); + //是否使用事件追踪 + boolean jobEvent = annotation.jobEvent(); + + JobCoreConfiguration.Builder jccBuilder = JobCoreConfiguration + .newBuilder(jobName, cron, shardingTotalCount); + if (!StringUtils.isEmpty(parameters)) { + jccBuilder.shardingItemParameters(parameters); + } + + SimpleJobConfiguration sjc = new SimpleJobConfiguration( + jccBuilder.build(), + simpleJob.getClass().getCanonicalName() + ); + LiteJobConfiguration ljc = LiteJobConfiguration + .newBuilder(sjc) + //分片策略,可以写进注解 + .jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()) + .overwrite(overwrite) + .build(); + // 配置数据源,事件追踪 +// if (jobEvent) { +// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource); +// new SpringJobScheduler(simpleJob, center, ljc, jec).init(); +// } else { +// new SpringJobScheduler(simpleJob, center, ljc).init(); +// } + + new SpringJobScheduler(simpleJob, center, ljc).init(); + } } } \ No newline at end of file diff --git a/elastic-job-springboot-core/src/main/java/com/example/config/ZookeeperAutoConfig.java b/elastic-job-springboot-core/src/main/java/com/example/config/ZookeeperAutoConfig.java index 1080e6e..b833fca 100644 --- a/elastic-job-springboot-core/src/main/java/com/example/config/ZookeeperAutoConfig.java +++ b/elastic-job-springboot-core/src/main/java/com/example/config/ZookeeperAutoConfig.java @@ -3,7 +3,6 @@ package com.example.config; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; -import com.example.config.ZookeeperProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; diff --git a/elastic-job-springboot-core/src/main/java/com/example/entity/Job.java b/elastic-job-springboot-core/src/main/java/com/example/entity/Job.java index 6d76836..6f20bec 100644 --- a/elastic-job-springboot-core/src/main/java/com/example/entity/Job.java +++ b/elastic-job-springboot-core/src/main/java/com/example/entity/Job.java @@ -1,12 +1,8 @@ package com.example.entity; -import com.dangdang.ddframe.job.executor.handler.JobProperties; -import com.sun.istack.internal.NotNull; import lombok.Data; import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.Map; /** * @author wjt @@ -22,85 +18,42 @@ public class Job implements Serializable { /** * 任务名称 */ - @NotNull private String jobName; - /** - * 任务类型 - */ - private String jobType; - /** - * 任务实例类路径 - */ - @NotNull - private String jobClass; /** * 执行时间表达式 */ - @NotNull private String cron; /** * 总分片数 */ private int shardingTotalCount = 1; + + /** + * 任务实例类路径 + */ + private String jobClass; + /** * 分片序列号/参数对照 多个键值对用逗号分隔 * 分片序列号从0开始,不可大于或等于作业分片总数 * 如0=a,1=b,2=c */ private String shardingItemParameters = ""; + /** * 自定义的参数,可通过传递该参数为作业调度的业务方法传参 */ private String jobParameter = ""; - /** - * 监控作业运行时状态 - * 每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取 - */ - private boolean monitorExecution = true; - /** - * 是否流式处理数据 - * 如果非流式处理数据, 则处理数据完成后作业结束 - */ - private boolean streamingProcess = false; - /** - * 最大允许的本机与注册中心的时间误差秒数 - * >配置为-1表示不校验时间误差 - */ - private int maxTimeDiffSeconds = -1; - /** - * 作业监控端口 - */ - private int monitorPort = -1; - /** - * 是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行 - */ - private boolean failover = false; - /** - * 是否开启错过任务重新执行 - */ - private boolean misfire = false; - /** - * 作业分片策略实现类全路径,默认使用平均分配策略 - */ - private String jobShardingStrategyClass = ""; + /** * 作业描述 */ private String description = ""; + private Boolean overwrite = true; /** - * 脚本型作业执行命令行 - */ - private String scriptCommandLine = ""; - /** - * 修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复,单位:分钟 - */ - private int reconcileIntervalMinutes = 10; - /** - * 异常定义 + * 是否使用事件追踪,需要datasource */ - private Map jobProperties = new LinkedHashMap(JobProperties.JobPropertiesEnum.values().length, 1.0F); - - private Boolean overwrite = true; + boolean jobEvent = false; } diff --git a/elastic-job-springboot-core/src/main/java/com/example/jobInstance/ElasticSimpleJob.java b/elastic-job-springboot-core/src/main/java/com/example/jobInstance/ElasticSimpleJob.java index f34cac5..878276d 100644 --- a/elastic-job-springboot-core/src/main/java/com/example/jobInstance/ElasticSimpleJob.java +++ b/elastic-job-springboot-core/src/main/java/com/example/jobInstance/ElasticSimpleJob.java @@ -1,7 +1,5 @@ package com.example.jobInstance; -import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; -import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import org.springframework.stereotype.Component; import java.lang.annotation.ElementType; @@ -10,7 +8,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * @author q + * @author wjt + * @date 2023/7/19 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @@ -33,23 +32,18 @@ public @interface ElasticSimpleJob { int shardingTotalCount() default 1; /** - * 是否刷新到Zookeeper + * 分片参数 */ - boolean overwrite() default false; + String shardingItemParameters() default ""; /** - * 使用的分片策略 + * 是否刷新到Zookeeper */ - Class jobStrategy() default AverageAllocationJobShardingStrategy.class; + boolean overwrite() default true; /** - * 是否使用事件追踪 + * 是否使用事件追踪,需要datasource + * 目前未做配置 */ boolean jobEvent() default false; - - /** - * 是否使用事件监听器 - */ - Class[] jobListeners() default {}; - } diff --git a/elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java b/elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java index 5183232..435f87d 100644 --- a/elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java +++ b/elastic-job-springboot-core/src/main/java/com/example/serive/JobService.java @@ -46,4 +46,18 @@ public interface JobService { * @param job */ void updateJob(Job job); + + /** + * 暂停 + * + * @param jobName + */ + void pauseJob(String jobName); + + /** + * 启动 + * + * @param jobName + */ + void start(String jobName); } diff --git a/elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java b/elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java index 2e7c94b..bb2427d 100644 --- a/elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java +++ b/elastic-job-springboot-core/src/main/java/com/example/serive/impl/JobServiceImpl.java @@ -1,12 +1,15 @@ package com.example.serive.impl; +import com.dangdang.ddframe.job.api.ElasticJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; -import com.dangdang.ddframe.job.executor.handler.JobProperties; -import com.dangdang.ddframe.job.lite.api.JobScheduler; +import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.internal.config.LiteJobConfigurationGsonFactory; +import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry; +import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController; import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath; +import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import com.example.entity.Job; @@ -15,10 +18,10 @@ import com.example.serive.JobService; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; -import org.apache.curator.framework.CuratorFramework; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import javax.sql.DataSource; import java.util.*; /** @@ -32,9 +35,9 @@ public class JobServiceImpl implements JobService { private CoordinatorRegistryCenter center; @Resource private ZookeeperRegistryCenter zookeeperRegistryCenter; - - // @Resource +// @Resource // private DataSource dataSource; + @Override public void addJob(Job job) { boolean overwrite = job.getOverwrite(); @@ -42,11 +45,8 @@ public class JobServiceImpl implements JobService { .newBuilder(job.getJobName(), job.getCron(), job.getShardingTotalCount()) .shardingItemParameters(job.getShardingItemParameters()) .description(job.getDescription()) - .failover(job.isFailover()) .jobParameter(job.getJobParameter()) - .misfire(job.isMisfire()) - .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobProperties().get("JOB_EXCEPTION_HANDLER")) - .jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getJobProperties().get("EXECUTOR_SERVICE_HANDLER")) + .misfire(true) .build(); SimpleJobConfiguration sjc = new SimpleJobConfiguration( jcc, job.getJobClass() @@ -54,26 +54,61 @@ public class JobServiceImpl implements JobService { LiteJobConfiguration ljc = LiteJobConfiguration .newBuilder(sjc) .overwrite(overwrite) - .monitorPort(job.getMonitorPort()) - .monitorExecution(job.isMonitorExecution()) - .maxTimeDiffSeconds(job.getMaxTimeDiffSeconds()) - .jobShardingStrategyClass(job.getJobShardingStrategyClass()) - .reconcileIntervalMinutes(job.getReconcileIntervalMinutes()) + .jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()) .build(); - // 配置数据源 -// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource); - //配置监听 -// MyNormalListener myNormalListener = new MyNormalListener(); - new JobScheduler(center, ljc).init(); + try { + ElasticJob elasticJob = (ElasticJob) Class.forName(job.getJobClass()).newInstance(); + // 配置数据源 +// if (job.isJobEvent()) { +// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource); +// new SpringJobScheduler(elasticJob, center, ljc, jec); +// } else { +// new SpringJobScheduler(elasticJob, center, ljc); +// } + new SpringJobScheduler(elasticJob, center, ljc); + } catch (Exception e) { + e.printStackTrace(); + log.error("加载任务失败,任务为 {}", job.getJobName()); + } } @Override public void removeJob(String jobName) { - try { - CuratorFramework client = zookeeperRegistryCenter.getClient(); - client.delete().deletingChildrenIfNeeded().forPath("/" + jobName); - } catch (Exception e) { - log.error("删除任务:{} 错误 {}", jobName, e.getMessage()); + JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); + if (jobScheduleController != null) { + jobScheduleController.pauseJob(); + jobScheduleController.shutdown(); + zookeeperRegistryCenter.remove("/" + jobName); + + } + //zk直接删除任务 +// try { +// CuratorFramework client = zookeeperRegistryCenter.getClient(); +// client.delete().deletingChildrenIfNeeded().forPath("/" + jobName); +// } catch (Exception e) { +// log.error("删除任务:{} 错误 {}", jobName, e.getMessage()); +// } + } + + /** + * 暂停任务 + */ + @Override + public void pauseJob(String jobName) { + JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); + if (jobScheduleController != null) { + jobScheduleController.pauseJob(); + } + } + + /** + * 立刻启动作业 + */ + @Override + public void start(String jobName) { + JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); + if (jobScheduleController != null) { + jobScheduleController.triggerJob(); } } @@ -86,7 +121,6 @@ public class JobServiceImpl implements JobService { try { JobNodePath jobNodePath = new JobNodePath(jobName); LiteJobConfiguration liteJobConfig = LiteJobConfigurationGsonFactory.fromJson(center.get(jobNodePath.getConfigNodePath())); - job.setJobType(liteJobConfig.getTypeConfig().getJobType().name()); this.buildSimpleJobSettings(jobName, job, liteJobConfig); } catch (Exception e) { log.error("查询job任务:{} 错误 {}", jobName, e.getMessage()); @@ -102,7 +136,6 @@ public class JobServiceImpl implements JobService { public List getAllJobsDetails() { List jobNames = zookeeperRegistryCenter.getChildrenKeys("/"); List result = new ArrayList<>(jobNames.size()); - for (String each : jobNames) { JobBriefInfo jobBriefInfo = this.getJobBriefInfo(each); if (null != jobBriefInfo) { @@ -124,20 +157,12 @@ public class JobServiceImpl implements JobService { private void buildSimpleJobSettings(String jobName, Job job, LiteJobConfiguration liteJobConfig) { job.setJobName(jobName); - job.setJobType(liteJobConfig.getTypeConfig().getJobType().name()); job.setJobClass(liteJobConfig.getTypeConfig().getJobClass()); job.setShardingTotalCount(liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount()); job.setCron(liteJobConfig.getTypeConfig().getCoreConfig().getCron()); job.setShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()); job.setJobParameter(liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter()); - job.setMonitorExecution(liteJobConfig.isMonitorExecution()); - job.setMaxTimeDiffSeconds(liteJobConfig.getMaxTimeDiffSeconds()); - job.setMonitorPort(liteJobConfig.getMonitorPort()); - job.setFailover(liteJobConfig.getTypeConfig().getCoreConfig().isFailover()); - job.setMisfire(liteJobConfig.getTypeConfig().getCoreConfig().isMisfire()); - job.setJobShardingStrategyClass(liteJobConfig.getJobShardingStrategyClass()); job.setDescription(liteJobConfig.getTypeConfig().getCoreConfig().getDescription()); - job.setReconcileIntervalMinutes(liteJobConfig.getReconcileIntervalMinutes()); job.setOverwrite(liteJobConfig.isOverwrite()); } @@ -179,7 +204,6 @@ public class JobServiceImpl implements JobService { ++disabledServerCount; } } - return disabledServerCount == serversPath.size(); } @@ -191,7 +215,6 @@ public class JobServiceImpl implements JobService { shardingInstances.add(instanceId); } } - return !instances.containsAll(shardingInstances) || shardingInstances.isEmpty(); } } diff --git a/elastic-job-springboot-test/src/main/java/com/example/Demo2Job.java b/elastic-job-springboot-test/src/main/java/com/example/Demo2Job.java deleted file mode 100644 index 4c84dbc..0000000 --- a/elastic-job-springboot-test/src/main/java/com/example/Demo2Job.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.example; - -import com.dangdang.ddframe.job.api.ShardingContext; -import com.dangdang.ddframe.job.api.simple.SimpleJob; -import lombok.extern.slf4j.Slf4j; - -/** - * @author wjt - * @date 2021/11/26 - */ -@Slf4j -public class Demo2Job implements SimpleJob { - @Override - public void execute(ShardingContext shardingContext) { - log.info("注册成功"); - } -} diff --git a/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java b/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java index b1fa6d3..0516912 100644 --- a/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java +++ b/elastic-job-springboot-test/src/main/java/com/example/DemoJob.java @@ -6,8 +6,7 @@ import com.example.jobInstance.ElasticSimpleJob; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; +import javax.annotation.Resource; /** * @author wujingtao @@ -15,12 +14,17 @@ import java.time.format.DateTimeFormatter; */ @Slf4j @ElasticSimpleJob( - jobName = "DemoJob", cron = "0 0/1 * * * ?", shardingTotalCount = 1, overwrite = true, jobEvent = false) + jobName = "DemoJob", cron = "0/5 * * * * ?", shardingTotalCount = 1, shardingItemParameters = "0=偶数,1=奇数,2=非奇非偶", overwrite = true) @Component public class DemoJob implements SimpleJob { + + @Resource + private Test test; + @Override public void execute(ShardingContext shardingContext) { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - log.info("定时任务 :{} 启动,{}", "DemoJob", formatter.format(LocalDateTime.now())); +// log.info("定时任务 :{} 启动={}", "DemoJob", shardingContext.toString()) + String s = test.sysOut("9527"); + log.info("输出:{},任务参数为:{}", s, shardingContext.toString()); } } diff --git a/elastic-job-springboot-test/src/main/java/com/example/Test.java b/elastic-job-springboot-test/src/main/java/com/example/Test.java new file mode 100644 index 0000000..83f156d --- /dev/null +++ b/elastic-job-springboot-test/src/main/java/com/example/Test.java @@ -0,0 +1,15 @@ +package com.example; + +import org.springframework.stereotype.Service; + +/** + * @author wjt + * @date 2023/7/19 + */ +@Service +public class Test { + + public String sysOut(String code) { + return code; + } +}