增加api操作定时任务的逻辑

master
wujingtao 3 years ago
parent 7af7a501bf
commit dd907c021a

@ -0,0 +1,67 @@
package com.example.controller;
import com.example.entity.Job;
import com.example.entity.JobBriefInfo;
import com.example.serive.JobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
/**
* @author wjt
* @date 2021/11/26
*/
@Slf4j
@RestController
public class JobController {
@Resource
private JobService jobService;
/**
*
*
* @param job
*/
@PostMapping("/add/job")
public void addJob(@RequestBody Job job) {
log.info("++++++++++++++++++++++++++++++++++++++++++++++++注册定时任务:{}", job.toString());
jobService.addJob(job);
}
/**
*
*
* @param jobName
*/
@GetMapping("/remove/job")
public void remove(String jobName) {
log.info("=+++++++++++++++++++++++++++++++++++++++++++++++删除定时任务:{}", jobName);
jobService.removeJob(jobName);
}
/**
*
*
* @param jobName
* @return
*/
@GetMapping("/get/job")
public Job getJobDetails(String jobName) {
return jobService.getJobDetail(jobName);
}
/**
*
*
* @return
*/
@GetMapping("/get/all")
public List<JobBriefInfo> getAllJobsDetails() {
return jobService.getAllJobsDetails();
}
}

@ -0,0 +1,128 @@
package com.example.entity;
import com.sun.istack.internal.NotNull;
import lombok.Data;
import java.io.Serializable;
/**
* @author wjt
* @date 2021/11/26
*/
@Data
public class Job implements Serializable {
private static final long serialVersionUID = -7135891833447229851L;
public Job() {
}
/**
*
*/
@NotNull
private String jobName;
/**
*
*/
private String jobType;
/**
*
*/
@NotNull
private String jobClass;
/**
*
*/
@NotNull
private String cron;
/**
*
*/
private int shardingTotalCount = 1;
private Boolean overwrite = true;
/**
* /
* 0
* 0=a,1=b,2=c
*/
private String shardingItemParameters="";
/**
* ,
*/
private String jobParameter="";
/**
*
*
*/
private boolean monitorExecution = true;
/**
*
* ,
*/
private boolean streamingProcess = false;
/**
*
* >-1
*/
private int maxTimeDiffSeconds = -1;
/**
*
*/
private int monitorPort = -1;
/**
*
*/
private boolean failover = false;
/**
*
*/
private boolean misfire = false;
/**
* ,使
*/
private String jobShardingStrategyClass = "";
/**
*
*/
private String description="";
/**
*
*/
private JobProperties jobProperties = new JobProperties();
/**
*
*/
private String scriptCommandLine="";
/**
* 1,
*/
private int reconcileIntervalMinutes = 10;
/**
* ,
*/
private boolean disabled = false;
/**
* ElasticJobListener
* @return
*/
private String listener = "";
/**
* AbstractDistributeOnceElasticJobListener
* @return
*/
private String distributedListener = "";
/**
* ,
* @return
*/
private long startedTimeoutMilliseconds = Long.MAX_VALUE;
/**
* ,
* @return
*/
private long completedTimeoutMilliseconds = Long.MAX_VALUE;
}

@ -0,0 +1,48 @@
package com.example.entity;
import lombok.Data;
import java.io.Serializable;
/**
* @author wjt
* @date 2021/11/30
*/
@Data
public class JobBriefInfo implements Serializable,Comparable<JobBriefInfo> {
private static final long serialVersionUID = -2150981397641113926L;
private String jobName;
private JobBriefInfo.JobStatus status;
private String description;
private String cron;
private int instanceCount;
private int shardingTotalCount;
@Override
public int compareTo(JobBriefInfo o) {
return this.getJobName().compareTo(o.getJobName());
}
public static enum JobStatus {
/**
*
*/
OK,
/**
* 线
*/
CRASHED,
/**
*
*/
DISABLED,
/**
*
*/
SHARDING_FLAG;
private JobStatus() {
}
}
}

@ -0,0 +1,42 @@
package com.example.serive;
import com.example.entity.Job;
import com.example.entity.JobBriefInfo;
import java.util.List;
/**
* @author wjt
* @date 2021/11/26
*/
public interface JobService {
/**
* job
*
* @param job
*/
void addJob(Job job);
/**
*
*
* @param jobName
*/
void removeJob(String jobName);
/**
* job
*
* @param jobName
* @return
*/
Job getJobDetail(String jobName);
/**
*
*
* @return
*/
List<JobBriefInfo> getAllJobsDetails();
}

@ -0,0 +1,232 @@
package com.example.serive.impl;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.internal.config.LiteJobConfigurationGsonFactory;
import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.example.entity.Job;
import com.example.entity.JobBriefInfo;
import com.example.serive.JobService;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.*;
/**
* @author wjt
* @date 2021/11/26
*/
@Slf4j
@Service
public class JobServiceImpl implements JobService {
@Resource
private CoordinatorRegistryCenter center;
@Resource
private ZookeeperRegistryCenter zookeeperRegistryCenter;
// @Resource
// private DataSource dataSource;
@Resource
private ApplicationContext ctx;
@Override
public void addJob(Job job) {
boolean overwrite = job.getOverwrite();
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder(job.getJobName(), job.getCron(), job.getShardingTotalCount())
.shardingItemParameters(job.getShardingItemParameters())
.description(job.getDescription())
.failover(job.isFailover())
.jobParameter(job.getJobParameter())
.misfire(job.isMisfire())
.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobProperties().getJobExceptionHandler())
.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getJobProperties().getExecutorServiceHandler())
.build();
SimpleJobConfiguration sjc = new SimpleJobConfiguration(
jcc, job.getJobClass()
);
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(sjc)
.overwrite(overwrite)
.disabled(job.isDisabled())
.monitorPort(job.getMonitorPort())
.monitorExecution(job.isMonitorExecution())
.maxTimeDiffSeconds(job.getMaxTimeDiffSeconds())
.jobShardingStrategyClass(job.getJobShardingStrategyClass())
.reconcileIntervalMinutes(job.getReconcileIntervalMinutes())
.build();
// 配置数据源
// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
//配置监听
// MyNormalListener myNormalListener = new MyNormalListener();
List<BeanDefinition> elasticJobListeners = getTargetElasticJobListeners(job);
// 构建SpringJobScheduler对象来初始化任务
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
// 脚步执行类型
// factory.addConstructorArgValue(null);
// factory.addConstructorArgValue(jec);
BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(job.getJobClass());
factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
factory.addConstructorArgValue(zookeeperRegistryCenter);
factory.addConstructorArgValue(ljc);
factory.addConstructorArgValue(elasticJobListeners);
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) ctx.getAutowireCapableBeanFactory();
defaultListableBeanFactory.registerBeanDefinition("SpringJobScheduler" + job.getJobName(), factory.getBeanDefinition());
SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean("SpringJobScheduler" + job.getJobName());
springJobScheduler.init();
log.info("【" + job.getJobName() + "】\t" + job.getJobClass() + "\t 注册成功");
}
@Override
public void removeJob(String jobName) {
try {
CuratorFramework client = zookeeperRegistryCenter.getClient();
client.delete().deletingChildrenIfNeeded().forPath("/" + jobName);
} catch (Exception e) {
log.error("删除任务:{} 错误 {}", jobName, e.getMessage());
}
}
/**
*
*/
@Override
public Job getJobDetail(String jobName) {
Job job = new Job();
try {
JobNodePath jobNodePath = new JobNodePath(jobName);
LiteJobConfiguration liteJobConfig = LiteJobConfigurationGsonFactory.fromJson(center.get(jobNodePath.getConfigNodePath()));
job.setJobType(liteJobConfig.getTypeConfig().getJobType().name());
this.buildSimpleJobSettings(jobName, job, liteJobConfig);
} catch (Exception e) {
log.error("查询job任务{} 错误 {}", jobName, e.getMessage());
}
return job;
}
@Override
public List<JobBriefInfo> getAllJobsDetails() {
List<String> jobNames = zookeeperRegistryCenter.getChildrenKeys("/");
List<JobBriefInfo> result = new ArrayList<>(jobNames.size());
for (String each : jobNames) {
JobBriefInfo jobBriefInfo = this.getJobBriefInfo(each);
if (null != jobBriefInfo) {
result.add(jobBriefInfo);
}
}
Collections.sort(result);
return result;
}
private List<BeanDefinition> getTargetElasticJobListeners(Job job) {
List<BeanDefinition> result = new ManagedList<>(2);
String listeners = job.getListener();
if (StringUtils.hasText(listeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners);
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
result.add(factory.getBeanDefinition());
}
String distributedListeners = job.getDistributedListener();
long startedTimeoutMilliseconds = job.getStartedTimeoutMilliseconds();
long completedTimeoutMilliseconds = job.getCompletedTimeoutMilliseconds();
if (StringUtils.hasText(distributedListeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners);
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
factory.addConstructorArgValue(startedTimeoutMilliseconds);
factory.addConstructorArgValue(completedTimeoutMilliseconds);
result.add(factory.getBeanDefinition());
}
return result;
}
private void buildSimpleJobSettings(String jobName, Job job, LiteJobConfiguration liteJobConfig) {
job.setJobName(jobName);
job.setJobType(liteJobConfig.getTypeConfig().getJobType().name());
job.setJobClass(liteJobConfig.getTypeConfig().getJobClass());
job.setShardingTotalCount(liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
job.setCron(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
job.setShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters());
job.setJobParameter(liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter());
job.setMonitorExecution(liteJobConfig.isMonitorExecution());
job.setMaxTimeDiffSeconds(liteJobConfig.getMaxTimeDiffSeconds());
job.setMonitorPort(liteJobConfig.getMonitorPort());
job.setFailover(liteJobConfig.getTypeConfig().getCoreConfig().isFailover());
job.setMisfire(liteJobConfig.getTypeConfig().getCoreConfig().isMisfire());
job.setJobShardingStrategyClass(liteJobConfig.getJobShardingStrategyClass());
job.setDescription(liteJobConfig.getTypeConfig().getCoreConfig().getDescription());
job.setReconcileIntervalMinutes(liteJobConfig.getReconcileIntervalMinutes());
job.setOverwrite(liteJobConfig.isOverwrite());
}
public JobBriefInfo getJobBriefInfo(String jobName) {
JobNodePath jobNodePath = new JobNodePath(jobName);
JobBriefInfo result = new JobBriefInfo();
result.setJobName(jobName);
String liteJobConfigJson = center.get(jobNodePath.getConfigNodePath());
if (null == liteJobConfigJson) {
return null;
} else {
LiteJobConfiguration liteJobConfig = LiteJobConfigurationGsonFactory.fromJson(liteJobConfigJson);
result.setDescription(liteJobConfig.getTypeConfig().getCoreConfig().getDescription());
result.setCron(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
result.setInstanceCount(center.getChildrenKeys((new JobNodePath(jobName)).getInstancesNodePath()).size());
result.setShardingTotalCount(liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
result.setStatus(this.getJobStatus(jobName));
return result;
}
}
private JobBriefInfo.JobStatus getJobStatus(String jobName) {
JobNodePath jobNodePath = new JobNodePath(jobName);
List<String> instances = center.getChildrenKeys(jobNodePath.getInstancesNodePath());
if (instances.isEmpty()) {
return JobBriefInfo.JobStatus.CRASHED;
} else if (this.isAllDisabled(jobNodePath)) {
return JobBriefInfo.JobStatus.DISABLED;
} else {
return this.isHasShardingFlag(jobNodePath, instances) ? JobBriefInfo.JobStatus.SHARDING_FLAG : JobBriefInfo.JobStatus.OK;
}
}
private boolean isAllDisabled(JobNodePath jobNodePath) {
List<String> serversPath = zookeeperRegistryCenter.getChildrenKeys(jobNodePath.getServerNodePath());
int disabledServerCount = 0;
for (String each : serversPath) {
if (JobBriefInfo.JobStatus.DISABLED.name().equals(center.get(jobNodePath.getServerNodePath(each)))) {
++disabledServerCount;
}
}
return disabledServerCount == serversPath.size();
}
private boolean isHasShardingFlag(JobNodePath jobNodePath, List<String> instances) {
Set<String> shardingInstances = new HashSet<>();
for (String each : center.getChildrenKeys(jobNodePath.getShardingNodePath())) {
String instanceId = center.get(jobNodePath.getShardingNodePath(each, "instance"));
if (null != instanceId && !instanceId.isEmpty()) {
shardingInstances.add(instanceId);
}
}
return !instances.containsAll(shardingInstances) || shardingInstances.isEmpty();
}
}

@ -0,0 +1,17 @@
package com.example;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
/**
* @author wjt
* @date 2021/11/26
*/
@Slf4j
public class Demo2Job implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("注册成功");
}
}

@ -15,7 +15,7 @@ import java.time.format.DateTimeFormatter;
*/
@Slf4j
@ElasticSimpleJob(
jobName = "DemoJob", cron = "0 0/1 * * * ?", shardingTotalCount = 1, overwrite = true, jobEvent = false)
jobName = "DemoJob", cron = "0/30 * * * * ?", shardingTotalCount = 1, overwrite = true, jobEvent = false)
@Component
public class DemoJob implements SimpleJob {
@Override

@ -45,11 +45,11 @@
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.dangdang</groupId>-->
<!-- <artifactId>elastic-job-lite-spring</artifactId>-->
<!-- <version>2.1.5</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>${project.name}</finalName>

Loading…
Cancel
Save