更新job配置

master
wjt 1 year ago
parent d40b2ff25e
commit 46218fc844

@ -3,9 +3,9 @@ package com.example.config;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.example.jobInstance.ElasticSimpleJob;
import lombok.extern.slf4j.Slf4j;
@ -13,13 +13,16 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;
/**
* @author q
* @author wjt
* @date 2023/7/19
*/
@Slf4j
@Configuration
@ -32,43 +35,58 @@ public class SimpleJobAutoConfig {
@Resource
private ApplicationContext applicationContext;
// @Resource
// private DataSource dataSource;
@PostConstruct
public void initSimpleJob() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
beans.forEach((key, instance) -> {
Class<?>[] interfaces = instance.getClass().getInterfaces();
for (Class<?> anInterface : interfaces) {
if (anInterface == SimpleJob.class) {
ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
//任务类
SimpleJob simpleJob = entry.getValue();
//注解实体
ElasticSimpleJob annotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
if (annotation == null) {
continue;
}
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
//是否刷新zk
boolean overwrite = annotation.overwrite();
Class<?> jobStrategy = annotation.jobStrategy();
//分片参数
String parameters = annotation.shardingItemParameters();
//是否使用事件追踪
boolean jobEvent = annotation.jobEvent();
Class<? extends ElasticJobListener>[] jobListeners = annotation.jobListeners();
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.build();
JobCoreConfiguration.Builder jccBuilder = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount);
if (!StringUtils.isEmpty(parameters)) {
jccBuilder.shardingItemParameters(parameters);
}
SimpleJobConfiguration sjc = new SimpleJobConfiguration(
jcc,
instance.getClass().getCanonicalName()
jccBuilder.build(),
simpleJob.getClass().getCanonicalName()
);
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(sjc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
//分片策略,可以写进注解
.jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName())
.overwrite(overwrite)
.build();
// 配置数据源
// 配置数据源,事件追踪
// if (jobEvent) {
// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
//配置监听
// MyNormalListener myNormalListener = new MyNormalListener();
new JobScheduler(center, ljc).init();
}
// new SpringJobScheduler(simpleJob, center, ljc, jec).init();
// } else {
// new SpringJobScheduler(simpleJob, center, ljc).init();
// }
new SpringJobScheduler(simpleJob, center, ljc).init();
}
});
}
}

@ -3,7 +3,6 @@ package com.example.config;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.example.config.ZookeeperProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;

@ -1,12 +1,8 @@
package com.example.entity;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.sun.istack.internal.NotNull;
import lombok.Data;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author wjt
@ -22,85 +18,42 @@ public class Job implements Serializable {
/**
*
*/
@NotNull
private String jobName;
/**
*
*/
private String jobType;
/**
*
*/
@NotNull
private String jobClass;
/**
*
*/
@NotNull
private String cron;
/**
*
*/
private int shardingTotalCount = 1;
/**
*
*/
private String jobClass;
/**
* /
* 0
* 0=a,1=b,2=c
*/
private String shardingItemParameters = "";
/**
* ,
*/
private String jobParameter = "";
/**
*
*
*/
private boolean monitorExecution = true;
/**
*
* ,
*/
private boolean streamingProcess = false;
/**
*
* >-1
*/
private int maxTimeDiffSeconds = -1;
/**
*
*/
private int monitorPort = -1;
/**
*
*/
private boolean failover = false;
/**
*
*/
private boolean misfire = false;
/**
* ,使
*/
private String jobShardingStrategyClass = "";
/**
*
*/
private String description = "";
private Boolean overwrite = true;
/**
*
*/
private String scriptCommandLine = "";
/**
* 1,
*/
private int reconcileIntervalMinutes = 10;
/**
*
* 使,datasource
*/
private Map<String, String> jobProperties = new LinkedHashMap(JobProperties.JobPropertiesEnum.values().length, 1.0F);
private Boolean overwrite = true;
boolean jobEvent = false;
}

@ -1,7 +1,5 @@
package com.example.jobInstance;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
@ -10,7 +8,8 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author q
* @author wjt
* @date 2023/7/19
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@ -33,23 +32,18 @@ public @interface ElasticSimpleJob {
int shardingTotalCount() default 1;
/**
* Zookeeper
*
*/
boolean overwrite() default false;
String shardingItemParameters() default "";
/**
* 使
* Zookeeper
*/
Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
boolean overwrite() default true;
/**
* 使
* 使,datasource
*
*/
boolean jobEvent() default false;
/**
* 使
*/
Class<? extends ElasticJobListener>[] jobListeners() default {};
}

@ -46,4 +46,18 @@ public interface JobService {
* @param job
*/
void updateJob(Job job);
/**
*
*
* @param jobName
*/
void pauseJob(String jobName);
/**
*
*
* @param jobName
*/
void start(String jobName);
}

@ -1,12 +1,15 @@
package com.example.serive.impl;
import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.internal.config.LiteJobConfigurationGsonFactory;
import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.example.entity.Job;
@ -15,10 +18,10 @@ import com.example.serive.JobService;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.*;
/**
@ -32,9 +35,9 @@ public class JobServiceImpl implements JobService {
private CoordinatorRegistryCenter center;
@Resource
private ZookeeperRegistryCenter zookeeperRegistryCenter;
// @Resource
// private DataSource dataSource;
@Override
public void addJob(Job job) {
boolean overwrite = job.getOverwrite();
@ -42,11 +45,8 @@ public class JobServiceImpl implements JobService {
.newBuilder(job.getJobName(), job.getCron(), job.getShardingTotalCount())
.shardingItemParameters(job.getShardingItemParameters())
.description(job.getDescription())
.failover(job.isFailover())
.jobParameter(job.getJobParameter())
.misfire(job.isMisfire())
.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobProperties().get("JOB_EXCEPTION_HANDLER"))
.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getJobProperties().get("EXECUTOR_SERVICE_HANDLER"))
.misfire(true)
.build();
SimpleJobConfiguration sjc = new SimpleJobConfiguration(
jcc, job.getJobClass()
@ -54,26 +54,61 @@ public class JobServiceImpl implements JobService {
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(sjc)
.overwrite(overwrite)
.monitorPort(job.getMonitorPort())
.monitorExecution(job.isMonitorExecution())
.maxTimeDiffSeconds(job.getMaxTimeDiffSeconds())
.jobShardingStrategyClass(job.getJobShardingStrategyClass())
.reconcileIntervalMinutes(job.getReconcileIntervalMinutes())
.jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName())
.build();
try {
ElasticJob elasticJob = (ElasticJob) Class.forName(job.getJobClass()).newInstance();
// 配置数据源
// if (job.isJobEvent()) {
// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
//配置监听
// MyNormalListener myNormalListener = new MyNormalListener();
new JobScheduler(center, ljc).init();
// new SpringJobScheduler(elasticJob, center, ljc, jec);
// } else {
// new SpringJobScheduler(elasticJob, center, ljc);
// }
new SpringJobScheduler(elasticJob, center, ljc);
} catch (Exception e) {
e.printStackTrace();
log.error("加载任务失败,任务为 {}", job.getJobName());
}
}
@Override
public void removeJob(String jobName) {
try {
CuratorFramework client = zookeeperRegistryCenter.getClient();
client.delete().deletingChildrenIfNeeded().forPath("/" + jobName);
} catch (Exception e) {
log.error("删除任务:{} 错误 {}", jobName, e.getMessage());
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (jobScheduleController != null) {
jobScheduleController.pauseJob();
jobScheduleController.shutdown();
zookeeperRegistryCenter.remove("/" + jobName);
}
//zk直接删除任务
// try {
// CuratorFramework client = zookeeperRegistryCenter.getClient();
// client.delete().deletingChildrenIfNeeded().forPath("/" + jobName);
// } catch (Exception e) {
// log.error("删除任务:{} 错误 {}", jobName, e.getMessage());
// }
}
/**
*
*/
@Override
public void pauseJob(String jobName) {
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (jobScheduleController != null) {
jobScheduleController.pauseJob();
}
}
/**
*
*/
@Override
public void start(String jobName) {
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (jobScheduleController != null) {
jobScheduleController.triggerJob();
}
}
@ -86,7 +121,6 @@ public class JobServiceImpl implements JobService {
try {
JobNodePath jobNodePath = new JobNodePath(jobName);
LiteJobConfiguration liteJobConfig = LiteJobConfigurationGsonFactory.fromJson(center.get(jobNodePath.getConfigNodePath()));
job.setJobType(liteJobConfig.getTypeConfig().getJobType().name());
this.buildSimpleJobSettings(jobName, job, liteJobConfig);
} catch (Exception e) {
log.error("查询job任务{} 错误 {}", jobName, e.getMessage());
@ -102,7 +136,6 @@ public class JobServiceImpl implements JobService {
public List<JobBriefInfo> getAllJobsDetails() {
List<String> jobNames = zookeeperRegistryCenter.getChildrenKeys("/");
List<JobBriefInfo> result = new ArrayList<>(jobNames.size());
for (String each : jobNames) {
JobBriefInfo jobBriefInfo = this.getJobBriefInfo(each);
if (null != jobBriefInfo) {
@ -124,20 +157,12 @@ public class JobServiceImpl implements JobService {
private void buildSimpleJobSettings(String jobName, Job job, LiteJobConfiguration liteJobConfig) {
job.setJobName(jobName);
job.setJobType(liteJobConfig.getTypeConfig().getJobType().name());
job.setJobClass(liteJobConfig.getTypeConfig().getJobClass());
job.setShardingTotalCount(liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
job.setCron(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
job.setShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters());
job.setJobParameter(liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter());
job.setMonitorExecution(liteJobConfig.isMonitorExecution());
job.setMaxTimeDiffSeconds(liteJobConfig.getMaxTimeDiffSeconds());
job.setMonitorPort(liteJobConfig.getMonitorPort());
job.setFailover(liteJobConfig.getTypeConfig().getCoreConfig().isFailover());
job.setMisfire(liteJobConfig.getTypeConfig().getCoreConfig().isMisfire());
job.setJobShardingStrategyClass(liteJobConfig.getJobShardingStrategyClass());
job.setDescription(liteJobConfig.getTypeConfig().getCoreConfig().getDescription());
job.setReconcileIntervalMinutes(liteJobConfig.getReconcileIntervalMinutes());
job.setOverwrite(liteJobConfig.isOverwrite());
}
@ -179,7 +204,6 @@ public class JobServiceImpl implements JobService {
++disabledServerCount;
}
}
return disabledServerCount == serversPath.size();
}
@ -191,7 +215,6 @@ public class JobServiceImpl implements JobService {
shardingInstances.add(instanceId);
}
}
return !instances.containsAll(shardingInstances) || shardingInstances.isEmpty();
}
}

@ -1,17 +0,0 @@
package com.example;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
/**
* @author wjt
* @date 2021/11/26
*/
@Slf4j
public class Demo2Job implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("注册成功");
}
}

@ -6,8 +6,7 @@ import com.example.jobInstance.ElasticSimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import javax.annotation.Resource;
/**
* @author wujingtao
@ -15,12 +14,17 @@ import java.time.format.DateTimeFormatter;
*/
@Slf4j
@ElasticSimpleJob(
jobName = "DemoJob", cron = "0 0/1 * * * ?", shardingTotalCount = 1, overwrite = true, jobEvent = false)
jobName = "DemoJob", cron = "0/5 * * * * ?", shardingTotalCount = 1, shardingItemParameters = "0=偶数,1=奇数,2=非奇非偶", overwrite = true)
@Component
public class DemoJob implements SimpleJob {
@Resource
private Test test;
@Override
public void execute(ShardingContext shardingContext) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
log.info("定时任务 :{} 启动,{}", "DemoJob", formatter.format(LocalDateTime.now()));
// log.info("定时任务 :{} 启动={}", "DemoJob", shardingContext.toString())
String s = test.sysOut("9527");
log.info("输出:{},任务参数为:{}", s, shardingContext.toString());
}
}

@ -0,0 +1,15 @@
package com.example;
import org.springframework.stereotype.Service;
/**
* @author wjt
* @date 2023/7/19
*/
@Service
public class Test {
public String sysOut(String code) {
return code;
}
}
Loading…
Cancel
Save