[新增功能](master): 创建ElasticJob-Springboot工程

包含Starter的创建
master
土豆兄弟 3 years ago
commit fa486b2b72

47
.gitignore vendored

@ -0,0 +1,47 @@
# Created by .ignore support plugin (hsz.mobi)
### Java template
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
out
gen
### Maven template
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar

@ -0,0 +1,7 @@
## 自动提示配置信息参考
https://docs.spring.io/spring-boot/docs/2.3.11.RELEASE/reference/html/appendix-configuration-metadata.html#configuration-metadata-annotation-processor-metadata-generation

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>elastic-job-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,15 @@
package com.example.application;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author q
*/
@SpringBootApplication
public class SpringBootElaticjobApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootElaticjobApplication.class, args);
}
}

@ -0,0 +1,71 @@
package com.example.application.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.example.autoconfig.ElatisDataflowJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author q
*/
@Slf4j
@ElatisDataflowJob(
jobName = "MyDataflowJob",
cron = "0/3 * * * * ?",
shardingTotalCount = 2,
overwrite = true,
streamingProcess = true,
jobEvent = false
)
public class MyDataflowJob implements DataflowJob<Integer> {
private List<Integer> list;
{
list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
}
@Override
public List<Integer> fetchData(ShardingContext shardingContext) {
List<Integer> result = new ArrayList<>();
// 数字 % 分片总数 == 当前分片数
for (Integer index : list) {
if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
result.add(index);
break;
}
}
// 模拟处理时间耗时
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我是分片项: {} , 我抓取的数据是: {} ",shardingContext.getShardingItem(), result);
return result;
}
@Override
public void processData(ShardingContext shardingContext, List<Integer> result) {
// list.removeAll(result);
// 模拟处理时间耗时
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我是分片项: {}, 我是处理的数据是: {}", shardingContext.getShardingItem(), result);
}
}

@ -0,0 +1,33 @@
package com.example.application.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.example.autoconfig.ElasticSimpleJob;
import com.example.autoconfig.sharding.MyshardingStrategy;
import lombok.extern.slf4j.Slf4j;
/**
* Job
* @author q
*/
@Slf4j
@ElasticSimpleJob(
jobName = "MyShardingJob",
cron = "0/3 * * * * ?",
shardingTotalCount = 10,
overwrite = true,
jobStrategy = MyshardingStrategy.class,
jobEvent = false
)
public class MyShardingJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("{} : 我是分片项: {} , 总分片数 : {} ",
this.getClass().getSimpleName(),
shardingContext.getShardingItem(),
shardingContext.getShardingTotalCount()
);
}
}

@ -0,0 +1,29 @@
package com.example.application.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.example.autoconfig.ElasticSimpleJob;
import lombok.extern.slf4j.Slf4j;
/**
* @author q
*/
@Slf4j
@ElasticSimpleJob(
jobName = "MySimpleJob",
cron = "0/3 * * * * ?",
shardingTotalCount = 2,
overwrite = true,
jobEvent = false
)
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("{} : 我是分片项: {} , 总分片数 : {} ",
this.getClass().getSimpleName(),
shardingContext.getShardingItem(),
shardingContext.getShardingTotalCount()
);
}
}

@ -0,0 +1,27 @@
package com.example.application.job.listener;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import lombok.extern.slf4j.Slf4j;
/**
* @author q
*/
@Slf4j
public class MyNormalListener implements ElasticJobListener{
/**
*
* @param shardingContexts
*/
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
log.info("我是 : {} 作业, 在方法前!", shardingContexts.getJobName());
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
log.info("我是 : {} 作业, 在方法后!", shardingContexts.getJobName());
}
}

@ -0,0 +1,75 @@
package com.example.autoconfig;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Map;
/**
* @author q
*/
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class DataflowJobAutoConfig {
@Autowired
private CoordinatorRegistryCenter zkCenter;
@Autowired
private ApplicationContext applicationContext;
@PostConstruct
public void initDataflowJob(){
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElatisDataflowJob.class);
beans.forEach((key, instance) -> {
Class<?>[] interfaces = instance.getClass().getInterfaces();
for (Class<?> anInterface : interfaces) {
if (anInterface == DataflowJob.class) {
ElatisDataflowJob annotation = instance.getClass().getAnnotation(ElatisDataflowJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
boolean process = annotation.streamingProcess();
Class<?> jobStrategy = annotation.jobStrategy();
Class<? extends ElasticJobListener>[] jobListeners = annotation.jobListeners();
// Job核心配置
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.build();
// job类型配置
DataflowJobConfiguration djc = new DataflowJobConfiguration(
jcc,
instance.getClass().getCanonicalName(),
process
);
// job根配置
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(djc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
new JobScheduler(zkCenter, ljc).init();
}
}
});
}
}

@ -0,0 +1,54 @@
package com.example.autoconfig;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author q
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {
/**
*
*/
String jobName() default "";
/**
* cron
*/
String cron() default "";
/**
*
*/
int shardingTotalCount() default 1;
/**
* Zookeeper
*/
boolean overwrite() default false;
/**
* 使
*/
Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
/**
* 使
*/
boolean jobEvent() default false;
/**
* 使
*/
Class<? extends ElasticJobListener>[] jobListeners() default {};
}

@ -0,0 +1,59 @@
package com.example.autoconfig;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author q
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElatisDataflowJob {
/**
*
*/
String jobName() default "";
/**
* cron
*/
String cron() default "";
/**
*
*/
int shardingTotalCount() default 1;
/**
* Zookeeper
*/
boolean overwrite() default false;
/**
*
*/
boolean streamingProcess() default false;
/**
* 使
*/
Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
/**
* 使
*/
boolean jobEvent() default false;
/**
* 使
*/
Class<? extends ElasticJobListener>[] jobListeners() default {};
}

@ -0,0 +1,82 @@
package com.example.autoconfig;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.example.autoconfig.sharding.MyshardingStrategy;
import org.quartz.JobDetail;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import java.util.Map;
/**
* @author q
*/
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
@Autowired
private CoordinatorRegistryCenter center;
@Autowired
private ApplicationContext applicationContext;
// @Autowired
// private DataSource dataSource;
@PostConstruct
public void initSimpleJob(){
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
beans.forEach((key, instance) -> {
Class<?>[] interfaces = instance.getClass().getInterfaces();
for (Class<?> anInterface : interfaces) {
if (anInterface == SimpleJob.class) {
ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
Class<?> jobStrategy = annotation.jobStrategy();
boolean jobEvent = annotation.jobEvent();
Class<? extends ElasticJobListener>[] jobListeners = annotation.jobListeners();
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.build();
SimpleJobConfiguration sjc = new SimpleJobConfiguration(
jcc,
instance.getClass().getCanonicalName()
);
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(sjc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
// 配置数据源
// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
new JobScheduler(center, ljc).init();
}
}
});
}
}

@ -0,0 +1,41 @@
package com.example.autoconfig;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*
* @author q
*/
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@Configuration
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {
@Autowired
private ZookeeperProperties zookeeperProperties;
/**
* Zookeeper
* @return Zookeeper
*/
@Bean(initMethod = "init")
public CoordinatorRegistryCenter zkCenter(){
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(
zookeeperProperties.getServerList(),
zookeeperProperties.getNamespace()
);
ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
return zookeeperRegistryCenter;
}
}

@ -0,0 +1,27 @@
package com.example.autoconfig;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Zookeeper properties
* @author q
*/
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
@Setter
@Getter
public class ZookeeperProperties {
/**
* Zookeeper
*/
private String serverList;
/**
* Zookeeper
*/
private String namespace;
// TODO: 2021/6/2 0002 其他的属性在这里进行配置即可
}

@ -0,0 +1,56 @@
package com.example.autoconfig.sharding;
import com.dangdang.ddframe.job.lite.api.strategy.JobInstance;
import com.dangdang.ddframe.job.lite.api.strategy.JobShardingStrategy;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* -
*
* @author q
*/
public class MyshardingStrategy implements JobShardingStrategy {
/**
* @param jobInstances
* @param jobName
* @param shardingTotalCount
* @return
*/
@Override
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
// 创建返回值容器
Map<JobInstance, List<Integer>> rtnMap = new HashMap<>(10);
// 创建队列模型
// fixme 这个队列可以自行进行选择
ArrayDeque<Integer> queue = new ArrayDeque<>(shardingTotalCount);
for (int i = 0; i < shardingTotalCount; i++) {
queue.add(i);
}
// 轮询
while (queue.size() > 0) {
for (JobInstance jobInstance : jobInstances) {
if (queue.size() > 0){
Integer shardingItem = queue.pop();
List<Integer> shardingItemList = rtnMap.get(jobInstance);
if (CollectionUtils.isEmpty(shardingItemList)){
ArrayList<Integer> list = new ArrayList<>(100);
list.add(shardingItem);
rtnMap.put(jobInstance, shardingItemList);
}else {
shardingItemList.add(shardingItem);
}
}
}
}
return rtnMap;
}
}

@ -0,0 +1,16 @@
{
"properties": [
{
"name": "elasticjob.zookeeper.namespace",
"type": "java.lang.String",
"description": "Zookeeper 命名空间",
"sourceType": "com.example.autoconfig.ZookeeperProperties"
},
{
"name": "elasticjob.zookeeper.server-list",
"type": "java.lang.String",
"description": "Zookeeper 服务列表",
"sourceType": "com.example.autoconfig.ZookeeperProperties"
}
]
}

@ -0,0 +1,4 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.autoconfig.ZookeeperAutoConfig,\
com.example.autoconfig.DataflowJobAutoConfig,\
com.example.autoconfig.SimpleJobAutoConfig

@ -0,0 +1,7 @@
server:
port: 8081
elasticjob:
zookeeper:
namespace: springboot-elasticjob
server-list: 192.168.72.100:2181
Loading…
Cancel
Save