整理后的代码

master
wujingtao 3 years ago
parent ed174b1984
commit 389e35c6d2

@ -6,45 +6,79 @@
<groupId>com.example</groupId>
<artifactId>elastic-job-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.name}</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.2.RELEASE</version>
<configuration>
<executable>true</executable>
<jvmArguments>-Dfile.encoding=UTF-8</jvmArguments>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

@ -7,9 +7,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @author q
*/
@SpringBootApplication
public class SpringBootElaticjobApplication {
public class SpringBootElasticJobApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootElaticjobApplication.class, args);
SpringApplication.run(SpringBootElasticJobApplication.class, args);
}
}

@ -1,71 +0,0 @@
package com.example.application.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.example.autoconfig.ElatisDataflowJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author q
*/
@Slf4j
@ElatisDataflowJob(
jobName = "MyDataflowJob",
cron = "0/3 * * * * ?",
shardingTotalCount = 2,
overwrite = true,
streamingProcess = true,
jobEvent = false
)
public class MyDataflowJob implements DataflowJob<Integer> {
private List<Integer> list;
{
list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
}
@Override
public List<Integer> fetchData(ShardingContext shardingContext) {
List<Integer> result = new ArrayList<>();
// 数字 % 分片总数 == 当前分片数
for (Integer index : list) {
if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
result.add(index);
break;
}
}
// 模拟处理时间耗时
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我是分片项: {} , 我抓取的数据是: {} ",shardingContext.getShardingItem(), result);
return result;
}
@Override
public void processData(ShardingContext shardingContext, List<Integer> result) {
// list.removeAll(result);
// 模拟处理时间耗时
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我是分片项: {}, 我是处理的数据是: {}", shardingContext.getShardingItem(), result);
}
}

@ -1,33 +0,0 @@
package com.example.application.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.example.autoconfig.ElasticSimpleJob;
import com.example.autoconfig.sharding.MyshardingStrategy;
import lombok.extern.slf4j.Slf4j;
/**
* Job
* @author q
*/
@Slf4j
@ElasticSimpleJob(
jobName = "MyShardingJob",
cron = "0/3 * * * * ?",
shardingTotalCount = 10,
overwrite = true,
jobStrategy = MyshardingStrategy.class,
jobEvent = false
)
public class MyShardingJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("{} : 我是分片项: {} , 总分片数 : {} ",
this.getClass().getSimpleName(),
shardingContext.getShardingItem(),
shardingContext.getShardingTotalCount()
);
}
}

@ -1,29 +0,0 @@
package com.example.application.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.example.autoconfig.ElasticSimpleJob;
import lombok.extern.slf4j.Slf4j;
/**
* @author q
*/
@Slf4j
@ElasticSimpleJob(
jobName = "MySimpleJob",
cron = "0/3 * * * * ?",
shardingTotalCount = 2,
overwrite = true,
jobEvent = false
)
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("{} : 我是分片项: {} , 总分片数 : {} ",
this.getClass().getSimpleName(),
shardingContext.getShardingItem(),
shardingContext.getShardingTotalCount()
);
}
}

@ -1,5 +1,6 @@
package com.example.application.job.listener;
import cn.hutool.core.date.DateUtil;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import lombok.extern.slf4j.Slf4j;
@ -9,19 +10,20 @@ import lombok.extern.slf4j.Slf4j;
* @author q
*/
@Slf4j
public class MyNormalListener implements ElasticJobListener{
public class MyNormalListener implements ElasticJobListener {
/**
*
*
* @param shardingContexts
*/
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
log.info("我是 : {} 作业, 在方法前!", shardingContexts.getJobName());
log.info("我是 : {} 作业, {}开始执行!", shardingContexts.getJobName(), DateUtil.now());
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
log.info("我是 : {} 作业, 在方法后!", shardingContexts.getJobName());
log.info("我是 : {} 作业, {}结束执行!", shardingContexts.getJobName(), DateUtil.now());
}
}

@ -31,14 +31,14 @@ public class DataflowJobAutoConfig {
private ApplicationContext applicationContext;
@PostConstruct
public void initDataflowJob(){
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElatisDataflowJob.class);
public void initDataflowJob() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticDataflowJob.class);
beans.forEach((key, instance) -> {
Class<?>[] interfaces = instance.getClass().getInterfaces();
for (Class<?> anInterface : interfaces) {
if (anInterface == DataflowJob.class) {
ElatisDataflowJob annotation = instance.getClass().getAnnotation(ElatisDataflowJob.class);
ElasticDataflowJob annotation = instance.getClass().getAnnotation(ElasticDataflowJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();

@ -15,7 +15,7 @@ import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElatisDataflowJob {
public @interface ElasticDataflowJob {
/**
*

@ -51,4 +51,5 @@ public @interface ElasticSimpleJob {
* 使
*/
Class<? extends ElasticJobListener>[] jobListeners() default {};
}

@ -3,46 +3,41 @@ package com.example.autoconfig;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.example.autoconfig.sharding.MyshardingStrategy;
import org.quartz.JobDetail;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;
/**
* @author q
*/
@Slf4j
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
@Autowired
@Resource
private CoordinatorRegistryCenter center;
@Autowired
@Resource
private ApplicationContext applicationContext;
// @Autowired
// @Resource
// private DataSource dataSource;
@PostConstruct
public void initSimpleJob(){
public void initSimpleJob() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
beans.forEach((key, instance) -> {
Class<?>[] interfaces = instance.getClass().getInterfaces();
for (Class<?> anInterface : interfaces) {
@ -55,28 +50,25 @@ public class SimpleJobAutoConfig {
Class<?> jobStrategy = annotation.jobStrategy();
boolean jobEvent = annotation.jobEvent();
Class<? extends ElasticJobListener>[] jobListeners = annotation.jobListeners();
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.build();
SimpleJobConfiguration sjc = new SimpleJobConfiguration(
jcc,
instance.getClass().getCanonicalName()
);
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(sjc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
// 配置数据源
// JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
//配置监听
// MyNormalListener myNormalListener = new MyNormalListener();
new JobScheduler(center, ljc).init();
}
}
});
}
}
}

@ -3,14 +3,16 @@ package com.example.autoconfig;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
*
*
* @author q
*/
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@ -18,24 +20,21 @@ import org.springframework.context.annotation.Configuration;
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {
@Autowired
@Resource
private ZookeeperProperties zookeeperProperties;
/**
* Zookeeper
*
* @return Zookeeper
*/
@Bean(initMethod = "init")
public CoordinatorRegistryCenter zkCenter(){
public CoordinatorRegistryCenter zkCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(
zookeeperProperties.getServerList(),
zookeeperProperties.getNamespace()
);
ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
return zookeeperRegistryCenter;
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}

@ -6,6 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Zookeeper properties
*
* @author q
*/
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
@ -22,6 +23,4 @@ public class ZookeeperProperties {
* Zookeeper
*/
private String namespace;
// TODO: 2021/6/2 0002 其他的属性在这里进行配置即可
}

@ -4,4 +4,4 @@ server:
elasticjob:
zookeeper:
namespace: springboot-elasticjob
server-list: 192.168.72.100:2181
server-list: localhost:2181

Loading…
Cancel
Save