diff --git a/pom.xml b/pom.xml index b2d2b41..b565c12 100644 --- a/pom.xml +++ b/pom.xml @@ -6,45 +6,79 @@ com.example elastic-job-springboot - 1.0-SNAPSHOT + 2.0-SNAPSHOT 8 8 - org.springframework.boot spring-boot-starter-web 2.3.2.RELEASE - org.springframework.boot spring-boot-starter-test 2.3.2.RELEASE - - org.springframework.boot spring-boot-configuration-processor true 2.3.2.RELEASE - org.projectlombok lombok 1.18.16 - com.dangdang elastic-job-lite-core 2.1.5 + + com.dangdang + elastic-job-lite-spring + 2.1.5 + + + cn.hutool + hutool-all + 5.4.0 + + + ${project.name} + + + src/main/resources + true + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.3.2.RELEASE + + true + -Dfile.encoding=UTF-8 + ${project.build.finalName} + + + + + repackage + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/example/application/SpringBootElaticjobApplication.java b/src/main/java/com/example/application/SpringBootElasticJobApplication.java similarity index 66% rename from src/main/java/com/example/application/SpringBootElaticjobApplication.java rename to src/main/java/com/example/application/SpringBootElasticJobApplication.java index ad84500..af2bba7 100644 --- a/src/main/java/com/example/application/SpringBootElaticjobApplication.java +++ b/src/main/java/com/example/application/SpringBootElasticJobApplication.java @@ -7,9 +7,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @author q */ @SpringBootApplication -public class SpringBootElaticjobApplication { +public class SpringBootElasticJobApplication { public static void main(String[] args) { - SpringApplication.run(SpringBootElaticjobApplication.class, args); + SpringApplication.run(SpringBootElasticJobApplication.class, args); } } \ No newline at end of file diff --git a/src/main/java/com/example/application/job/MyDataflowJob.java b/src/main/java/com/example/application/job/MyDataflowJob.java deleted file mode 100644 index 725d33a..0000000 --- a/src/main/java/com/example/application/job/MyDataflowJob.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.example.application.job; - -import com.dangdang.ddframe.job.api.ShardingContext; -import com.dangdang.ddframe.job.api.dataflow.DataflowJob; -import com.example.autoconfig.ElatisDataflowJob; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * @author q - */ -@Slf4j -@ElatisDataflowJob( - jobName = "MyDataflowJob", - cron = "0/3 * * * * ?", - shardingTotalCount = 2, - overwrite = true, - streamingProcess = true, - jobEvent = false -) -public class MyDataflowJob implements DataflowJob { - - private List list; - - { - list = Arrays.asList(1,2,3,4,5,6,7,8,9,10); - } - @Override - public List fetchData(ShardingContext shardingContext) { - - List result = new ArrayList<>(); - - // 数字 % 分片总数 == 当前分片数 - for (Integer index : list) { - if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){ - result.add(index); - break; - } - } - - // 模拟处理时间耗时 - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - log.info("我是分片项: {} , 我抓取的数据是: {} ",shardingContext.getShardingItem(), result); - - return result; - } - - @Override - public void processData(ShardingContext shardingContext, List result) { - -// list.removeAll(result); - - // 模拟处理时间耗时 - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - log.info("我是分片项: {}, 我是处理的数据是: {}", shardingContext.getShardingItem(), result); - } -} diff --git a/src/main/java/com/example/application/job/MyShardingJob.java b/src/main/java/com/example/application/job/MyShardingJob.java deleted file mode 100644 index a165651..0000000 --- a/src/main/java/com/example/application/job/MyShardingJob.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.example.application.job; - -import com.dangdang.ddframe.job.api.ShardingContext; -import com.dangdang.ddframe.job.api.simple.SimpleJob; -import com.example.autoconfig.ElasticSimpleJob; -import com.example.autoconfig.sharding.MyshardingStrategy; -import lombok.extern.slf4j.Slf4j; - -/** - * 测试自定义分片策略的Job - * @author q - */ -@Slf4j -@ElasticSimpleJob( - jobName = "MyShardingJob", - cron = "0/3 * * * * ?", - shardingTotalCount = 10, - overwrite = true, - jobStrategy = MyshardingStrategy.class, - jobEvent = false -) -public class MyShardingJob implements SimpleJob { - - - @Override - public void execute(ShardingContext shardingContext) { - log.info("{} : 我是分片项: {} , 总分片数 : {} ", - this.getClass().getSimpleName(), - shardingContext.getShardingItem(), - shardingContext.getShardingTotalCount() - ); - } -} diff --git a/src/main/java/com/example/application/job/MySimpleJob.java b/src/main/java/com/example/application/job/MySimpleJob.java deleted file mode 100644 index 42271a1..0000000 --- a/src/main/java/com/example/application/job/MySimpleJob.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.example.application.job; - -import com.dangdang.ddframe.job.api.ShardingContext; -import com.dangdang.ddframe.job.api.simple.SimpleJob; -import com.example.autoconfig.ElasticSimpleJob; -import lombok.extern.slf4j.Slf4j; - -/** - * @author q - */ -@Slf4j -@ElasticSimpleJob( - jobName = "MySimpleJob", - cron = "0/3 * * * * ?", - shardingTotalCount = 2, - overwrite = true, - jobEvent = false -) -public class MySimpleJob implements SimpleJob { - - @Override - public void execute(ShardingContext shardingContext) { - log.info("{} : 我是分片项: {} , 总分片数 : {} ", - this.getClass().getSimpleName(), - shardingContext.getShardingItem(), - shardingContext.getShardingTotalCount() - ); - } -} diff --git a/src/main/java/com/example/application/job/listener/MyNormalListener.java b/src/main/java/com/example/application/job/listener/MyNormalListener.java index 2bb75e0..e464896 100644 --- a/src/main/java/com/example/application/job/listener/MyNormalListener.java +++ b/src/main/java/com/example/application/job/listener/MyNormalListener.java @@ -1,5 +1,6 @@ package com.example.application.job.listener; +import cn.hutool.core.date.DateUtil; import com.dangdang.ddframe.job.executor.ShardingContexts; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import lombok.extern.slf4j.Slf4j; @@ -9,19 +10,20 @@ import lombok.extern.slf4j.Slf4j; * @author q */ @Slf4j -public class MyNormalListener implements ElasticJobListener{ +public class MyNormalListener implements ElasticJobListener { /** * 作业执行前 + * * @param shardingContexts */ @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { - log.info("我是 : {} 作业, 在方法前!", shardingContexts.getJobName()); + log.info("我是 : {} 作业, {}开始执行!", shardingContexts.getJobName(), DateUtil.now()); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { - log.info("我是 : {} 作业, 在方法后!", shardingContexts.getJobName()); + log.info("我是 : {} 作业, {}结束执行!", shardingContexts.getJobName(), DateUtil.now()); } } diff --git a/src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java b/src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java index 58eb1b9..eb87ce6 100644 --- a/src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java +++ b/src/main/java/com/example/autoconfig/DataflowJobAutoConfig.java @@ -31,14 +31,14 @@ public class DataflowJobAutoConfig { private ApplicationContext applicationContext; @PostConstruct - public void initDataflowJob(){ - Map beans = applicationContext.getBeansWithAnnotation(ElatisDataflowJob.class); + public void initDataflowJob() { + Map beans = applicationContext.getBeansWithAnnotation(ElasticDataflowJob.class); beans.forEach((key, instance) -> { Class[] interfaces = instance.getClass().getInterfaces(); for (Class anInterface : interfaces) { if (anInterface == DataflowJob.class) { - ElatisDataflowJob annotation = instance.getClass().getAnnotation(ElatisDataflowJob.class); + ElasticDataflowJob annotation = instance.getClass().getAnnotation(ElasticDataflowJob.class); String jobName = annotation.jobName(); String cron = annotation.cron(); diff --git a/src/main/java/com/example/autoconfig/ElatisDataflowJob.java b/src/main/java/com/example/autoconfig/ElasticDataflowJob.java similarity index 96% rename from src/main/java/com/example/autoconfig/ElatisDataflowJob.java rename to src/main/java/com/example/autoconfig/ElasticDataflowJob.java index 075edbb..15eb464 100644 --- a/src/main/java/com/example/autoconfig/ElatisDataflowJob.java +++ b/src/main/java/com/example/autoconfig/ElasticDataflowJob.java @@ -15,7 +15,7 @@ import java.lang.annotation.Target; @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component -public @interface ElatisDataflowJob { +public @interface ElasticDataflowJob { /** * 任务名称 diff --git a/src/main/java/com/example/autoconfig/ElasticSimpleJob.java b/src/main/java/com/example/autoconfig/ElasticSimpleJob.java index 38208c7..b69a2c7 100644 --- a/src/main/java/com/example/autoconfig/ElasticSimpleJob.java +++ b/src/main/java/com/example/autoconfig/ElasticSimpleJob.java @@ -51,4 +51,5 @@ public @interface ElasticSimpleJob { * 是否使用事件监听器 */ Class[] jobListeners() default {}; + } diff --git a/src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java b/src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java index 887c61d..545557a 100644 --- a/src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java +++ b/src/main/java/com/example/autoconfig/SimpleJobAutoConfig.java @@ -3,46 +3,41 @@ package com.example.autoconfig; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; -import com.dangdang.ddframe.job.event.JobEventConfiguration; -import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; -import com.example.autoconfig.sharding.MyshardingStrategy; -import org.quartz.JobDetail; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; +import javax.annotation.Resource; import javax.sql.DataSource; import java.util.Map; /** * @author q */ +@Slf4j @Configuration @ConditionalOnBean(CoordinatorRegistryCenter.class) @AutoConfigureAfter(ZookeeperAutoConfig.class) public class SimpleJobAutoConfig { - @Autowired + @Resource private CoordinatorRegistryCenter center; - @Autowired + @Resource private ApplicationContext applicationContext; - -// @Autowired +// @Resource // private DataSource dataSource; - @PostConstruct - public void initSimpleJob(){ + public void initSimpleJob() { Map beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class); - beans.forEach((key, instance) -> { Class[] interfaces = instance.getClass().getInterfaces(); for (Class anInterface : interfaces) { @@ -55,28 +50,25 @@ public class SimpleJobAutoConfig { Class jobStrategy = annotation.jobStrategy(); boolean jobEvent = annotation.jobEvent(); Class[] jobListeners = annotation.jobListeners(); - JobCoreConfiguration jcc = JobCoreConfiguration .newBuilder(jobName, cron, shardingTotalCount) .build(); - SimpleJobConfiguration sjc = new SimpleJobConfiguration( jcc, instance.getClass().getCanonicalName() ); - LiteJobConfiguration ljc = LiteJobConfiguration .newBuilder(sjc) .jobShardingStrategyClass(jobStrategy.getCanonicalName()) .overwrite(overwrite) .build(); - // 配置数据源 // JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource); - + //配置监听 +// MyNormalListener myNormalListener = new MyNormalListener(); new JobScheduler(center, ljc).init(); } } }); } -} +} \ No newline at end of file diff --git a/src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java b/src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java index 893441b..73330af 100644 --- a/src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java +++ b/src/main/java/com/example/autoconfig/ZookeeperAutoConfig.java @@ -3,14 +3,16 @@ package com.example.autoconfig; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import javax.annotation.Resource; + /** * 当有配置的时候就生效 + * * @author q */ @ConditionalOnProperty("elasticjob.zookeeper.server-list") @@ -18,24 +20,21 @@ import org.springframework.context.annotation.Configuration; @EnableConfigurationProperties(ZookeeperProperties.class) public class ZookeeperAutoConfig { - @Autowired + @Resource private ZookeeperProperties zookeeperProperties; /** * Zookeeper 注册中心自动装配 + * * @return Zookeeper注册中心实例 */ @Bean(initMethod = "init") - public CoordinatorRegistryCenter zkCenter(){ - + public CoordinatorRegistryCenter zkCenter() { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration( zookeeperProperties.getServerList(), zookeeperProperties.getNamespace() ); - - ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); - - return zookeeperRegistryCenter; + return new ZookeeperRegistryCenter(zookeeperConfiguration); } } diff --git a/src/main/java/com/example/autoconfig/ZookeeperProperties.java b/src/main/java/com/example/autoconfig/ZookeeperProperties.java index 234d3fa..0b5dee9 100644 --- a/src/main/java/com/example/autoconfig/ZookeeperProperties.java +++ b/src/main/java/com/example/autoconfig/ZookeeperProperties.java @@ -6,6 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** * Zookeeper properties + * * @author q */ @ConfigurationProperties(prefix = "elasticjob.zookeeper") @@ -22,6 +23,4 @@ public class ZookeeperProperties { * Zookeeper 命名空间 */ private String namespace; - - // TODO: 2021/6/2 0002 其他的属性在这里进行配置即可 } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 25ce1fa..e7ee043 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,4 +4,4 @@ server: elasticjob: zookeeper: namespace: springboot-elasticjob - server-list: 192.168.72.100:2181 \ No newline at end of file + server-list: localhost:2181