Welcome everyone

Elastic Job实践

分布式 汪明鑫 741浏览 0评论

Elastic Job是依托于ZK的非中心化分片定时任务框架

本文我们简单了解下分布式调度的概念和理念,以及初步学习Elastic Job

 

可以先看下文

 

分布式任务调度概述

为什么需要分布式任务?

JDK、Spring 提供的单机版定时任务无法满足日益复杂的互联网业务需求

单机版存在性能瓶颈,没有容灾,不能动态启停,不好管理

 

分布式任务解决了什么?

分布式任务支持集群调度,分片处理定时作业,高可用,一般都具有动态启停的功能和可视化界面的管理

 

分布式任务框架有哪些?

常见的有Quartz、Elastic Job、XXL-Job

Elastic Job和XXL-Job其实内部也是基于Quartz做的扩展

 

提前了解Cron表达式

例如: 0 * * * * ?

七个域 秒 分 时 日 月 星期 年
六个域(常用) 秒 分 时 日 月 星期

 

字段               允许值            允许的特殊字符
秒(Seconds) 0~59的整数     , – * / 四个字符
分(Minutes) 0~59的整数      , – * / 四个字符
小时(Hours) 0~23的整数      , – * / 四个字符
日期(DayofMonth) 1~31的整数(但是你需要考虑你月的天数)  ,- * ? / L W C 八个字符
月份(Month) 1~12的整数或者 JAN-DEC  , – * / 四个字符
星期(DayofWeek) 1~7的整数或者 SUN-SAT (1=SUN)    , – * ? / L C # 八个字符
年(可选,留空)(Year) 1970~2099         , – * / 四个字符

 

一些例子:

(0)0/20 * * * * ? 表示每20秒 调整任务

(1)0 0 2 1 * ? 表示在每月的1日的凌晨2点调整任务

(2)0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业

(3)0 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作

(4)0 0 10,14,16 * * ? 每天上午10点,下午2点,4点

(5)0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时

(6)0 0 12 ? * WED 表示每个星期三中午12点

(7)0 0 12 * * ? 每天中午12点触发

(8)0 15 10 ? * * 每天上午10:15触发

(9)0 15 10 * * ? 每天上午10:15触发

(10)0 15 10 * * ? * 每天上午10:15触发

(11)0 15 10 * * ? 2005 2005年的每天上午10:15触发

(12)0 * 14 * * ? 在每天下午2点到下午2:59期间的每1分钟触发

(13)0 0/5 14 * * ? 在每天下午2点到下午2:55期间的每5分钟触发

(14)0 0/5 14,18 * * ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发

(15)0 0-5 14 * * ? 在每天下午2点到下午2:05期间的每1分钟触发

(16)0 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发

(17)0 15 10 ? * MON-FRI 周一至周五的上午10:15触发

(18)0 15 10 15 * ? 每月15日上午10:15触发

(19)0 15 10 L * ? 每月最后一日的上午10:15触发

(20)0 15 10 ? * 6L 每月的最后一个星期五上午10:15触发

(21)0 15 10 ? * 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发

(22)0 15 10 ? * 6#3 每月的第三个星期五上午10:15触发

 

 

cron表达式生成:

https://cron.qqe2.com/

 

Elastic Job介绍

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

外部依赖Zk做注册中心,基于zk做任务调度

支持任务分片、支持弹性扩缩容

 

功能列表:

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

 

本文主要跑下Elastic-Job-Lite的demo

 

Elastic Job 环境准备

首先java,maven就不说了

elastic job依赖zk,我们就先整个zk

先启个本地的ZK, 我的是直接在mac上整,当然云服务器更好。

 

brew install zookeeper

 

连接客户端

zkCli

 

再来个zk的可视化工具

下载地址:

https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

下载完后解压缩

 

启动后是这个鸟样

于是zk就完活了

 

 

Spring Boot 整合 Elastic Job

引入依赖

<dependency>
  <groupId>com.purgeteam</groupId>
  <artifactId>elasticjob-spring-boot-starter</artifactId>
  <version>0.1.1.RELEASE</version>
</dependency>

 

配置 application.yml

spring:
  elasticjob:
    datasource: # job需要的记录数据源
      url: jdbc:mysql://39.97.47.254:3306/mydb?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password:
    regCenter: # 注册中心
      serverList: 127.0.0.1:2181
      namespace: elasticJobDemo

 

注册中心就是本地起的zk

 

定时任务:

package pers.wmx.springbootfreemarkerdemo.elasticjob;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.stereotype.Component;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

/**
 * @author wangmingxin03
 * Created on 2021-07-08
 */
@Component
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        System.out.println(
                String.format("分片项 ShardingItem: %s | 运行时间: %s | 线程ID: %s | 分片参数: %s ",
                        context.getShardingItem(),
                        new SimpleDateFormat("HH:mm:ss").format(new Date()),
                        Thread.currentThread().getId(),
                        context.getShardingParameter())
        );
    }
}

 

任务配置:

package pers.wmx.springbootfreemarkerdemo.elasticjob;

import javax.annotation.Resource;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.purgeteam.elasticjob.starter.util.ElasticJobUtils;

/**
 * @author wangmingxin03
 * Created on 2021-07-08
 */
@Configuration
public class MyJobConfig {
    // job 名称
    private static final String JOB_NAME = "MySimpleJob";

    // 定时器cron参数
    private static final String CRON = "0/5 * * * * ?";

    // 定时器分片
    private static final int SHARDING_TOTAL_COUNT = 2;

    // 分片参数
    private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai";

    // 自定义参数
    private static final String JOB_PARAMETERS = "parameter";

    @Resource
    private ZookeeperRegistryCenter regCenter;

    @Resource
    private JobEventConfiguration jobEventConfiguration;


    @Bean(initMethod = "init")
    public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) {

        LiteJobConfiguration liteJobConfiguration = ElasticJobUtils
                .getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON,
                        SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS);
        // 参数:1.定时器实例,2.注册中心类,3.LiteJobConfiguration,
        //     3.历史轨迹(不需要可以省略)
        return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration);
    }
}

 

getLiteJobConfiguration

/**
     * 获取 {@link LiteJobConfiguration} 对象
     *
     * @param jobClass               定时器实现类
     * @param jobName                定时器名称
     * @param cron                   定时参数
     * @param shardingTotalCount     作业分片总数
     * @param shardingItemParameters 当前参数 可以为null
     * @param jobParameters          作业自定义参数 可以为null
     * @return {@link LiteJobConfiguration}
     */
    @SuppressWarnings("all")
    public static LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass,
                                                               final String jobName,
                                                               final String cron,
                                                               final int shardingTotalCount,
                                                               final String shardingItemParameters,
                                                               final String jobParameters) {

 

 

启起来Spring Boot

定时任务开始跑起来了

 

 

我拷贝一下工程,再起一个分片

看下日志变化

分片任务1:

分片任务2:

 

刚好是2个shard,获取了各自的分片参数

 

 

看下db存储:

 

最后再看下ZK存储

 

 

在看下zk存储的 job config

{“jobName”:”MySimpleJob”,”jobClass”:”pers.wmx.springbootfreemarkerdemo.elasticjob.MySimpleJob”,”jobType”:”SIMPLE”,”cron”:”0/5 * * * * ?”,”shardingTotalCount”:2,”shardingItemParameters”:”0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou”,”jobParameter”:”parameter”,”failover”:false,”misfire”:true,”description”:””,”jobProperties”:{“job_exception_handler”:”com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler”,”executor_service_handler”:”com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler”},”monitorExecution”:true,”maxTimeDiffSeconds”:-1,”monitorPort”:-1,”jobShardingStrategyClass”:””,”reconcileIntervalMinutes”:10,”disabled”:false,”overwrite”:true}

 

 

 

转载请注明:汪明鑫的个人博客 » Elastic Job实践

喜欢 (4)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz