Elastic Job是依托于ZK的非中心化分片定时任务框架
本文我们简单了解下分布式调度的概念和理念,以及初步学习Elastic Job
可以先看下文
目录
分布式任务调度概述
为什么需要分布式任务?
JDK、Spring 提供的单机版定时任务无法满足日益复杂的互联网业务需求
单机版存在性能瓶颈,没有容灾,不能动态启停,不好管理
分布式任务解决了什么?
分布式任务支持集群调度,分片处理定时作业,高可用,一般都具有动态启停的功能和可视化界面的管理
分布式任务框架有哪些?
常见的有Quartz、Elastic Job、XXL-Job
Elastic Job和XXL-Job其实内部也是基于Quartz做的扩展
提前了解Cron表达式
例如: 0 * * * * ? 七个域 秒 分 时 日 月 星期 年 六个域(常用) 秒 分 时 日 月 星期
字段 允许值 允许的特殊字符
秒(Seconds) 0~59的整数 , – * / 四个字符
分(Minutes) 0~59的整数 , – * / 四个字符
小时(Hours) 0~23的整数 , – * / 四个字符
日期(DayofMonth) 1~31的整数(但是你需要考虑你月的天数) ,- * ? / L W C 八个字符
月份(Month) 1~12的整数或者 JAN-DEC , – * / 四个字符
星期(DayofWeek) 1~7的整数或者 SUN-SAT (1=SUN) , – * ? / L C # 八个字符
年(可选,留空)(Year) 1970~2099 , – * / 四个字符
一些例子:
(0)0/20 * * * * ? 表示每20秒 调整任务
(1)0 0 2 1 * ? 表示在每月的1日的凌晨2点调整任务
(2)0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业
(3)0 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作
(4)0 0 10,14,16 * * ? 每天上午10点,下午2点,4点
(5)0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时
(6)0 0 12 ? * WED 表示每个星期三中午12点
(7)0 0 12 * * ? 每天中午12点触发
(8)0 15 10 ? * * 每天上午10:15触发
(9)0 15 10 * * ? 每天上午10:15触发
(10)0 15 10 * * ? * 每天上午10:15触发
(11)0 15 10 * * ? 2005 2005年的每天上午10:15触发
(12)0 * 14 * * ? 在每天下午2点到下午2:59期间的每1分钟触发
(13)0 0/5 14 * * ? 在每天下午2点到下午2:55期间的每5分钟触发
(14)0 0/5 14,18 * * ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
(15)0 0-5 14 * * ? 在每天下午2点到下午2:05期间的每1分钟触发
(16)0 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发
(17)0 15 10 ? * MON-FRI 周一至周五的上午10:15触发
(18)0 15 10 15 * ? 每月15日上午10:15触发
(19)0 15 10 L * ? 每月最后一日的上午10:15触发
(20)0 15 10 ? * 6L 每月的最后一个星期五上午10:15触发
(21)0 15 10 ? * 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发
(22)0 15 10 ? * 6#3 每月的第三个星期五上午10:15触发
cron表达式生成:
Elastic Job介绍
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
外部依赖Zk做注册中心,基于zk做任务调度
支持任务分片、支持弹性扩缩容
功能列表:
- 分布式调度协调
- 弹性扩容缩容
- 失效转移
- 错过执行作业重触发
- 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
- 自诊断并修复分布式不稳定造成的问题
- 支持并行调度
- 支持作业生命周期操作
- 丰富的作业类型
- Spring整合以及命名空间提供
- 运维平台
本文主要跑下Elastic-Job-Lite的demo
Elastic Job 环境准备
首先java,maven就不说了
elastic job依赖zk,我们就先整个zk
先启个本地的ZK, 我的是直接在mac上整,当然云服务器更好。
brew install zookeeper
连接客户端
zkCli
再来个zk的可视化工具
下载地址:
https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
下载完后解压缩
启动后是这个鸟样
于是zk就完活了
Spring Boot 整合 Elastic Job
引入依赖
<dependency>
<groupId>com.purgeteam</groupId>
<artifactId>elasticjob-spring-boot-starter</artifactId>
<version>0.1.1.RELEASE</version>
</dependency>
配置 application.yml
spring: elasticjob: datasource: # job需要的记录数据源 url: jdbc:mysql://39.97.47.254:3306/mydb?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false driver-class-name: com.mysql.cj.jdbc.Driver username: root password: regCenter: # 注册中心 serverList: 127.0.0.1:2181 namespace: elasticJobDemo
注册中心就是本地起的zk
定时任务:
package pers.wmx.springbootfreemarkerdemo.elasticjob;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.stereotype.Component;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
/**
* @author wangmingxin03
* Created on 2021-07-08
*/
@Component
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println(
String.format("分片项 ShardingItem: %s | 运行时间: %s | 线程ID: %s | 分片参数: %s ",
context.getShardingItem(),
new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getId(),
context.getShardingParameter())
);
}
}
任务配置:
package pers.wmx.springbootfreemarkerdemo.elasticjob;
import javax.annotation.Resource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.purgeteam.elasticjob.starter.util.ElasticJobUtils;
/**
* @author wangmingxin03
* Created on 2021-07-08
*/
@Configuration
public class MyJobConfig {
// job 名称
private static final String JOB_NAME = "MySimpleJob";
// 定时器cron参数
private static final String CRON = "0/5 * * * * ?";
// 定时器分片
private static final int SHARDING_TOTAL_COUNT = 2;
// 分片参数
private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai";
// 自定义参数
private static final String JOB_PARAMETERS = "parameter";
@Resource
private ZookeeperRegistryCenter regCenter;
@Resource
private JobEventConfiguration jobEventConfiguration;
@Bean(initMethod = "init")
public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) {
LiteJobConfiguration liteJobConfiguration = ElasticJobUtils
.getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON,
SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS);
// 参数:1.定时器实例,2.注册中心类,3.LiteJobConfiguration,
// 3.历史轨迹(不需要可以省略)
return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration);
}
}
getLiteJobConfiguration
/**
* 获取 {@link LiteJobConfiguration} 对象
*
* @param jobClass 定时器实现类
* @param jobName 定时器名称
* @param cron 定时参数
* @param shardingTotalCount 作业分片总数
* @param shardingItemParameters 当前参数 可以为null
* @param jobParameters 作业自定义参数 可以为null
* @return {@link LiteJobConfiguration}
*/
@SuppressWarnings("all")
public static LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass,
final String jobName,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters) {
启起来Spring Boot
定时任务开始跑起来了
我拷贝一下工程,再起一个分片
看下日志变化
分片任务1:
分片任务2:
刚好是2个shard,获取了各自的分片参数
看下db存储:
最后再看下ZK存储
在看下zk存储的 job config
{“jobName”:”MySimpleJob”,”jobClass”:”pers.wmx.springbootfreemarkerdemo.elasticjob.MySimpleJob”,”jobType”:”SIMPLE”,”cron”:”0/5 * * * * ?”,”shardingTotalCount”:2,”shardingItemParameters”:”0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou”,”jobParameter”:”parameter”,”failover”:false,”misfire”:true,”description”:””,”jobProperties”:{“job_exception_handler”:”com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler”,”executor_service_handler”:”com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler”},”monitorExecution”:true,”maxTimeDiffSeconds”:-1,”monitorPort”:-1,”jobShardingStrategyClass”:””,”reconcileIntervalMinutes”:10,”disabled”:false,”overwrite”:true}
转载请注明:汪明鑫的个人博客 » Elastic Job实践
说点什么
您将是第一位评论人!