From fa486b2b7223c7933cabe9b78dac5f30fcc4d2aa Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Tue, 22 Jun 2021 13:51:23 +0800 Subject: [PATCH] =?UTF-8?q?[=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD](master):?= =?UTF-8?q?=20=E5=88=9B=E5=BB=BAElasticJob-Springboot=E5=B7=A5=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 包含Starter的创建 --- .gitignore | 47 +++++++++++ README.md | 7 ++ pom.xml | 50 +++++++++++ .../SpringBootElaticjobApplication.java | 15 ++++ .../application/job/MyDataflowJob.java | 71 ++++++++++++++++ .../application/job/MyShardingJob.java | 33 ++++++++ .../example/application/job/MySimpleJob.java | 29 +++++++ .../job/listener/MyNormalListener.java | 27 ++++++ .../autoconfig/DataflowJobAutoConfig.java | 75 +++++++++++++++++ .../example/autoconfig/ElasticSimpleJob.java | 54 ++++++++++++ .../example/autoconfig/ElatisDataflowJob.java | 59 +++++++++++++ .../autoconfig/SimpleJobAutoConfig.java | 82 +++++++++++++++++++ .../autoconfig/ZookeeperAutoConfig.java | 41 ++++++++++ .../autoconfig/ZookeeperProperties.java | 27 ++++++ .../sharding/MyshardingStrategy.java | 56 +++++++++++++ .../spring-configuration-metadata.json | 16 ++++ src/main/resources/META-INF/spring.factories | 4 + src/main/resources/application.yml | 7 ++ 18 files changed, 700 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/com/example/application/SpringBootElaticjobApplication.java create mode 100644 src/main/java/com/example/application/job/MyDataflowJob.java create mode 100644 src/main/java/com/example/application/job/MyShardingJob.java create mode 100644 src/main/java/com/example/application/job/MySimpleJob.java create mode 100644 src/main/java/com/example/application/job/listener/MyNormalListener.java create mode 100644 src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java create mode 100644 src/main/java/com/example/autoconfig/ElasticSimpleJob.java create mode 100644 src/main/java/com/example/autoconfig/ElatisDataflowJob.java create mode 100644 src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java create mode 100644 src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java create mode 100644 src/main/java/com/example/autoconfig/ZookeeperProperties.java create mode 100644 src/main/java/com/example/autoconfig/sharding/MyshardingStrategy.java create mode 100644 src/main/resources/META-INF/spring-configuration-metadata.json create mode 100644 src/main/resources/META-INF/spring.factories create mode 100644 src/main/resources/application.yml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..221da7a --- /dev/null +++ b/.gitignore @@ -0,0 +1,47 @@ +# Created by .ignore support plugin (hsz.mobi) +### Java template +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +### Example user template template +### Example user template + +# IntelliJ project files +.idea +*.iml +out +gen +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +# https://github.com/takari/maven-wrapper#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar + diff --git a/README.md b/README.md new file mode 100644 index 0000000..1b1573a --- /dev/null +++ b/README.md @@ -0,0 +1,7 @@ +## 自动提示配置信息参考 +https://docs.spring.io/spring-boot/docs/2.3.11.RELEASE/reference/html/appendix-configuration-metadata.html#configuration-metadata-annotation-processor-metadata-generation + + + + + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..b2d2b41 --- /dev/null +++ b/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + com.example + elastic-job-springboot + 1.0-SNAPSHOT + + + 8 + 8 + + + + + + org.springframework.boot + spring-boot-starter-web + 2.3.2.RELEASE + + + + org.springframework.boot + spring-boot-starter-test + 2.3.2.RELEASE + + + + + org.springframework.boot + spring-boot-configuration-processor + true + 2.3.2.RELEASE + + + + org.projectlombok + lombok + 1.18.16 + + + + com.dangdang + elastic-job-lite-core + 2.1.5 + + + \ No newline at end of file diff --git a/src/main/java/com/example/application/SpringBootElaticjobApplication.java b/src/main/java/com/example/application/SpringBootElaticjobApplication.java new file mode 100644 index 0000000..ad84500 --- /dev/null +++ b/src/main/java/com/example/application/SpringBootElaticjobApplication.java @@ -0,0 +1,15 @@ +package com.example.application; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author q + */ +@SpringBootApplication +public class SpringBootElaticjobApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringBootElaticjobApplication.class, args); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/application/job/MyDataflowJob.java b/src/main/java/com/example/application/job/MyDataflowJob.java new file mode 100644 index 0000000..725d33a --- /dev/null +++ b/src/main/java/com/example/application/job/MyDataflowJob.java @@ -0,0 +1,71 @@ +package com.example.application.job; + +import com.dangdang.ddframe.job.api.ShardingContext; +import com.dangdang.ddframe.job.api.dataflow.DataflowJob; +import com.example.autoconfig.ElatisDataflowJob; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * @author q + */ +@Slf4j +@ElatisDataflowJob( + jobName = "MyDataflowJob", + cron = "0/3 * * * * ?", + shardingTotalCount = 2, + overwrite = true, + streamingProcess = true, + jobEvent = false +) +public class MyDataflowJob implements DataflowJob { + + private List list; + + { + list = Arrays.asList(1,2,3,4,5,6,7,8,9,10); + } + @Override + public List fetchData(ShardingContext shardingContext) { + + List result = new ArrayList<>(); + + // 数字 % 分片总数 == 当前分片数 + for (Integer index : list) { + if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){ + result.add(index); + break; + } + } + + // 模拟处理时间耗时 + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("我是分片项: {} , 我抓取的数据是: {} ",shardingContext.getShardingItem(), result); + + return result; + } + + @Override + public void processData(ShardingContext shardingContext, List result) { + +// list.removeAll(result); + + // 模拟处理时间耗时 + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("我是分片项: {}, 我是处理的数据是: {}", shardingContext.getShardingItem(), result); + } +} diff --git a/src/main/java/com/example/application/job/MyShardingJob.java b/src/main/java/com/example/application/job/MyShardingJob.java new file mode 100644 index 0000000..a165651 --- /dev/null +++ b/src/main/java/com/example/application/job/MyShardingJob.java @@ -0,0 +1,33 @@ +package com.example.application.job; + +import com.dangdang.ddframe.job.api.ShardingContext; +import com.dangdang.ddframe.job.api.simple.SimpleJob; +import com.example.autoconfig.ElasticSimpleJob; +import com.example.autoconfig.sharding.MyshardingStrategy; +import lombok.extern.slf4j.Slf4j; + +/** + * 测试自定义分片策略的Job + * @author q + */ +@Slf4j +@ElasticSimpleJob( + jobName = "MyShardingJob", + cron = "0/3 * * * * ?", + shardingTotalCount = 10, + overwrite = true, + jobStrategy = MyshardingStrategy.class, + jobEvent = false +) +public class MyShardingJob implements SimpleJob { + + + @Override + public void execute(ShardingContext shardingContext) { + log.info("{} : 我是分片项: {} , 总分片数 : {} ", + this.getClass().getSimpleName(), + shardingContext.getShardingItem(), + shardingContext.getShardingTotalCount() + ); + } +} diff --git a/src/main/java/com/example/application/job/MySimpleJob.java b/src/main/java/com/example/application/job/MySimpleJob.java new file mode 100644 index 0000000..42271a1 --- /dev/null +++ b/src/main/java/com/example/application/job/MySimpleJob.java @@ -0,0 +1,29 @@ +package com.example.application.job; + +import com.dangdang.ddframe.job.api.ShardingContext; +import com.dangdang.ddframe.job.api.simple.SimpleJob; +import com.example.autoconfig.ElasticSimpleJob; +import lombok.extern.slf4j.Slf4j; + +/** + * @author q + */ +@Slf4j +@ElasticSimpleJob( + jobName = "MySimpleJob", + cron = "0/3 * * * * ?", + shardingTotalCount = 2, + overwrite = true, + jobEvent = false +) +public class MySimpleJob implements SimpleJob { + + @Override + public void execute(ShardingContext shardingContext) { + log.info("{} : 我是分片项: {} , 总分片数 : {} ", + this.getClass().getSimpleName(), + shardingContext.getShardingItem(), + shardingContext.getShardingTotalCount() + ); + } +} diff --git a/src/main/java/com/example/application/job/listener/MyNormalListener.java b/src/main/java/com/example/application/job/listener/MyNormalListener.java new file mode 100644 index 0000000..2bb75e0 --- /dev/null +++ b/src/main/java/com/example/application/job/listener/MyNormalListener.java @@ -0,0 +1,27 @@ +package com.example.application.job.listener; + +import com.dangdang.ddframe.job.executor.ShardingContexts; +import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; +import lombok.extern.slf4j.Slf4j; + + +/** + * @author q + */ +@Slf4j +public class MyNormalListener implements ElasticJobListener{ + + /** + * 作业执行前 + * @param shardingContexts + */ + @Override + public void beforeJobExecuted(ShardingContexts shardingContexts) { + log.info("我是 : {} 作业, 在方法前!", shardingContexts.getJobName()); + } + + @Override + public void afterJobExecuted(ShardingContexts shardingContexts) { + log.info("我是 : {} 作业, 在方法后!", shardingContexts.getJobName()); + } +} diff --git a/src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java b/src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java new file mode 100644 index 0000000..58eb1b9 --- /dev/null +++ b/src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java @@ -0,0 +1,75 @@ +package com.example.autoconfig; + +import com.dangdang.ddframe.job.api.dataflow.DataflowJob; +import com.dangdang.ddframe.job.config.JobCoreConfiguration; +import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; +import com.dangdang.ddframe.job.lite.api.JobScheduler; +import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; +import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import java.util.Map; + +/** + * @author q + */ +@Configuration +@ConditionalOnBean(CoordinatorRegistryCenter.class) +@AutoConfigureAfter(ZookeeperAutoConfig.class) +public class DataflowJobAutoConfig { + + @Autowired + private CoordinatorRegistryCenter zkCenter; + + @Autowired + private ApplicationContext applicationContext; + + @PostConstruct + public void initDataflowJob(){ + Map beans = applicationContext.getBeansWithAnnotation(ElatisDataflowJob.class); + beans.forEach((key, instance) -> { + Class[] interfaces = instance.getClass().getInterfaces(); + + for (Class anInterface : interfaces) { + if (anInterface == DataflowJob.class) { + ElatisDataflowJob annotation = instance.getClass().getAnnotation(ElatisDataflowJob.class); + + String jobName = annotation.jobName(); + String cron = annotation.cron(); + int shardingTotalCount = annotation.shardingTotalCount(); + boolean overwrite = annotation.overwrite(); + boolean process = annotation.streamingProcess(); + Class jobStrategy = annotation.jobStrategy(); + Class[] jobListeners = annotation.jobListeners(); + + // Job核心配置 + JobCoreConfiguration jcc = JobCoreConfiguration + .newBuilder(jobName, cron, shardingTotalCount) + .build(); + + // job类型配置 + DataflowJobConfiguration djc = new DataflowJobConfiguration( + jcc, + instance.getClass().getCanonicalName(), + process + ); + + // job根配置 + LiteJobConfiguration ljc = LiteJobConfiguration + .newBuilder(djc) + .jobShardingStrategyClass(jobStrategy.getCanonicalName()) + .overwrite(overwrite) + .build(); + + new JobScheduler(zkCenter, ljc).init(); + } + } + }); + } +} diff --git a/src/main/java/com/example/autoconfig/ElasticSimpleJob.java b/src/main/java/com/example/autoconfig/ElasticSimpleJob.java new file mode 100644 index 0000000..38208c7 --- /dev/null +++ b/src/main/java/com/example/autoconfig/ElasticSimpleJob.java @@ -0,0 +1,54 @@ +package com.example.autoconfig; + +import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; +import org.springframework.stereotype.Component; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author q + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +public @interface ElasticSimpleJob { + + /** + * 任务名称 + */ + String jobName() default ""; + + /** + * cron 表达式 + */ + String cron() default ""; + + /** + * 分片总数 + */ + int shardingTotalCount() default 1; + + /** + * 是否刷新到Zookeeper + */ + boolean overwrite() default false; + + /** + * 使用的分片策略 + */ + Class jobStrategy() default AverageAllocationJobShardingStrategy.class; + + /** + * 是否使用事件追踪 + */ + boolean jobEvent() default false; + + /** + * 是否使用事件监听器 + */ + Class[] jobListeners() default {}; +} diff --git a/src/main/java/com/example/autoconfig/ElatisDataflowJob.java b/src/main/java/com/example/autoconfig/ElatisDataflowJob.java new file mode 100644 index 0000000..075edbb --- /dev/null +++ b/src/main/java/com/example/autoconfig/ElatisDataflowJob.java @@ -0,0 +1,59 @@ +package com.example.autoconfig; + +import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; +import org.springframework.stereotype.Component; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author q + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +public @interface ElatisDataflowJob { + + /** + * 任务名称 + */ + String jobName() default ""; + + /** + * cron 表达式 + */ + String cron() default ""; + + /** + * 分片总数 + */ + int shardingTotalCount() default 1; + + /** + * 是否刷新到Zookeeper + */ + boolean overwrite() default false; + + /** + * 是否流式处理 + */ + boolean streamingProcess() default false; + + /** + * 使用的分片策略 + */ + Class jobStrategy() default AverageAllocationJobShardingStrategy.class; + + /** + * 是否使用事件追踪 + */ + boolean jobEvent() default false; + + /** + * 是否使用事件监听器 + */ + Class[] jobListeners() default {}; +} diff --git a/src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java b/src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java new file mode 100644 index 0000000..887c61d --- /dev/null +++ b/src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java @@ -0,0 +1,82 @@ +package com.example.autoconfig; + +import com.dangdang.ddframe.job.api.simple.SimpleJob; +import com.dangdang.ddframe.job.config.JobCoreConfiguration; +import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; +import com.dangdang.ddframe.job.event.JobEventConfiguration; +import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration; +import com.dangdang.ddframe.job.lite.api.JobScheduler; +import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; +import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; +import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; +import com.example.autoconfig.sharding.MyshardingStrategy; +import org.quartz.JobDetail; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import javax.sql.DataSource; +import java.util.Map; + +/** + * @author q + */ +@Configuration +@ConditionalOnBean(CoordinatorRegistryCenter.class) +@AutoConfigureAfter(ZookeeperAutoConfig.class) +public class SimpleJobAutoConfig { + + @Autowired + private CoordinatorRegistryCenter center; + + @Autowired + private ApplicationContext applicationContext; + +// @Autowired +// private DataSource dataSource; + + + @PostConstruct + public void initSimpleJob(){ + Map beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class); + + beans.forEach((key, instance) -> { + Class[] interfaces = instance.getClass().getInterfaces(); + for (Class anInterface : interfaces) { + if (anInterface == SimpleJob.class) { + ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class); + String jobName = annotation.jobName(); + String cron = annotation.cron(); + int shardingTotalCount = annotation.shardingTotalCount(); + boolean overwrite = annotation.overwrite(); + Class jobStrategy = annotation.jobStrategy(); + boolean jobEvent = annotation.jobEvent(); + Class[] jobListeners = annotation.jobListeners(); + + JobCoreConfiguration jcc = JobCoreConfiguration + .newBuilder(jobName, cron, shardingTotalCount) + .build(); + + SimpleJobConfiguration sjc = new SimpleJobConfiguration( + jcc, + instance.getClass().getCanonicalName() + ); + + LiteJobConfiguration ljc = LiteJobConfiguration + .newBuilder(sjc) + .jobShardingStrategyClass(jobStrategy.getCanonicalName()) + .overwrite(overwrite) + .build(); + + // 配置数据源 +// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource); + + new JobScheduler(center, ljc).init(); + } + } + }); + } +} diff --git a/src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java b/src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java new file mode 100644 index 0000000..893441b --- /dev/null +++ b/src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java @@ -0,0 +1,41 @@ +package com.example.autoconfig; + +import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; +import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; +import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 当有配置的时候就生效 + * @author q + */ +@ConditionalOnProperty("elasticjob.zookeeper.server-list") +@Configuration +@EnableConfigurationProperties(ZookeeperProperties.class) +public class ZookeeperAutoConfig { + + @Autowired + private ZookeeperProperties zookeeperProperties; + + /** + * Zookeeper 注册中心自动装配 + * @return Zookeeper注册中心实例 + */ + @Bean(initMethod = "init") + public CoordinatorRegistryCenter zkCenter(){ + + ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration( + zookeeperProperties.getServerList(), + zookeeperProperties.getNamespace() + ); + + ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); + + return zookeeperRegistryCenter; + } + +} diff --git a/src/main/java/com/example/autoconfig/ZookeeperProperties.java b/src/main/java/com/example/autoconfig/ZookeeperProperties.java new file mode 100644 index 0000000..234d3fa --- /dev/null +++ b/src/main/java/com/example/autoconfig/ZookeeperProperties.java @@ -0,0 +1,27 @@ +package com.example.autoconfig; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Zookeeper properties + * @author q + */ +@ConfigurationProperties(prefix = "elasticjob.zookeeper") +@Setter +@Getter +public class ZookeeperProperties { + + /** + * Zookeeper 地址列表 + */ + private String serverList; + + /** + * Zookeeper 命名空间 + */ + private String namespace; + + // TODO: 2021/6/2 0002 其他的属性在这里进行配置即可 +} diff --git a/src/main/java/com/example/autoconfig/sharding/MyshardingStrategy.java b/src/main/java/com/example/autoconfig/sharding/MyshardingStrategy.java new file mode 100644 index 0000000..eabfbad --- /dev/null +++ b/src/main/java/com/example/autoconfig/sharding/MyshardingStrategy.java @@ -0,0 +1,56 @@ +package com.example.autoconfig.sharding; + +import com.dangdang.ddframe.job.lite.api.strategy.JobInstance; +import com.dangdang.ddframe.job.lite.api.strategy.JobShardingStrategy; +import org.springframework.util.CollectionUtils; + +import java.util.*; + +/** + * 自定义分片策略 - 实现全轮询算法 + * + * @author q + */ +public class MyshardingStrategy implements JobShardingStrategy { + + + /** + * @param jobInstances 所有有效的服务 + * @param jobName 作业名称 + * @param shardingTotalCount 分片总数 + * @return 实例 所获取的分片项 + */ + @Override + public Map> sharding(List jobInstances, String jobName, int shardingTotalCount) { + + // 创建返回值容器 + Map> rtnMap = new HashMap<>(10); + + // 创建队列模型 + // fixme 这个队列可以自行进行选择 + ArrayDeque queue = new ArrayDeque<>(shardingTotalCount); + + for (int i = 0; i < shardingTotalCount; i++) { + queue.add(i); + } + + // 轮询 + while (queue.size() > 0) { + for (JobInstance jobInstance : jobInstances) { + if (queue.size() > 0){ + Integer shardingItem = queue.pop(); + List shardingItemList = rtnMap.get(jobInstance); + if (CollectionUtils.isEmpty(shardingItemList)){ + ArrayList list = new ArrayList<>(100); + list.add(shardingItem); + rtnMap.put(jobInstance, shardingItemList); + }else { + shardingItemList.add(shardingItem); + } + } + } + } + + return rtnMap; + } +} diff --git a/src/main/resources/META-INF/spring-configuration-metadata.json b/src/main/resources/META-INF/spring-configuration-metadata.json new file mode 100644 index 0000000..91e7898 --- /dev/null +++ b/src/main/resources/META-INF/spring-configuration-metadata.json @@ -0,0 +1,16 @@ +{ + "properties": [ + { + "name": "elasticjob.zookeeper.namespace", + "type": "java.lang.String", + "description": "Zookeeper 命名空间", + "sourceType": "com.example.autoconfig.ZookeeperProperties" + }, + { + "name": "elasticjob.zookeeper.server-list", + "type": "java.lang.String", + "description": "Zookeeper 服务列表", + "sourceType": "com.example.autoconfig.ZookeeperProperties" + } + ] +} \ No newline at end of file diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..0d6988f --- /dev/null +++ b/src/main/resources/META-INF/spring.factories @@ -0,0 +1,4 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.example.autoconfig.ZookeeperAutoConfig,\ + com.example.autoconfig.DataflowJobAutoConfig,\ + com.example.autoconfig.SimpleJobAutoConfig diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..25ce1fa --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,7 @@ +server: + port: 8081 + +elasticjob: + zookeeper: + namespace: springboot-elasticjob + server-list: 192.168.72.100:2181 \ No newline at end of file