Welcome everyone

Elastic Job 原理

分布式 汪明鑫 416浏览 0评论

在上篇文章,我准备了ZK环境、Mysql环境,跑了Spring Boot整合Elastic Job的例子

今天从原理和源码层面更进一步学习Elastic Job, Elastic Job依赖ZK注册任务信息、任务实例,并且会把任务对应的分片也存储在ZK上,

任务的选主也是进行在ZK上,

如果任务实例有挂掉的,会触发ZK的watcher机制, 触发任务分片的重分配。

Elastic Job 底层还基于Quartz做了封装。

 

如果想测试分片情况,可以用上篇文章的例子idea上跑多个任务实例

 

 

详细看看elastic job在 ZK上存储的数据

ZK中数据的存储是通过namespace隔离的

  • instances 节点:

同一个 Job 下的 elastic-job 的部署实例。一台机器上可以启动多个 Job 实例,也就是 Jar 包。instances 的命名是 IP+@-@+PID。只有在运行的时候能看到。

 

  • leader 节点:

任务实例的主节点信息,通过 zookeeper 的主节点选举,选出来的主节点信息。在elastic job 中,任务的执行可以分布在不同的实例(节点)中,但任务分片等核心控制,需要由主节点完成。因此,任务执行前,需要选举出主节点。下面有三个子节点:

  1. election:主节点选举
  2. sharding:分片
  1. failover:失效转移,这里没有显示是未发生失效转移

 

  • servers 节点:

任务实例的信息,主要是 IP 地址,任务实例的 IP 地址。跟 instances 不同,如果多个任务实例在同一台机器上运行则只会出现一个 IP 子节点。可在 IP 地址节点写入DISABLED 表示该任务实例禁用。

 

  • sharding 节点:

任务的分片信息,子节点是分片项序号,从 0 开始。分片个数是在任务配置中设置的。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。最主要的子节点就是 instance。

 

 

 

为了方便调试代码,我们再整个简单的demo

@Component
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        System.out.println(
                String.format("分片项 ShardingItem: %s | 运行时间: %s | 线程ID: %s | 分片参数: %s | 任务参数: %s ",
                        context.getShardingItem(),
                        new SimpleDateFormat("HH:mm:ss").format(new Date()),
                        Thread.currentThread().getId(),
                        context.getShardingParameter(),
                        context.getJobParameter())
        );
    }
}

 

public class SimpleJobTest {
    // 原生Java跑一个
    public static void main(String[] args) {
        // zk 注册中心
        CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(
                new ZookeeperConfiguration("localhost:2181", "ejob-standalone")
        );
        registryCenter.init();

        // 数据源 , 事件执行持久化策略
        DruidDataSource dataSource =new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf-8");
        dataSource.setUsername("root");
        dataSource.setPassword("wmx123");
        JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);

        JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(
                "simpleJob", "0/20 * * * * ?", 4)
                .build();

        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
                coreConfig, MySimpleJob.class.getCanonicalName()
        );

        // 作业根配置
        LiteJobConfiguration jobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
                .overwrite(true)
                .build();

        // 构建job
        new JobScheduler(registryCenter, jobRootConfig, jobEventConfig).init();

    }
}

 

梳理了下面的类图

 

可以看到任务调度器绝对是核心角色,

关联了注册中心 ZookeeperRegistryCenter  作业核心配置 LiteJobConfiguration,还关联了 JobEventConfiguration

LiteJobConfiguration 组合了 SimpleJobConfiguration , 而 SimpleJobConfiguration 组合了任务和执行参数。

 

可以看到demo都是初始化的一些创建操作,包括zk、数据库资源、创建任务配置、执行周期等。

最关键就是开启调度  new JobScheduler(registryCenter, jobRootConfig, jobEventConfig).init();

 

 

 

/**
* 初始化作业.
*/
public void init() {
  LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
  // 设置分片数
  JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
  // 构建任务,创建调度器
  JobScheduleController jobScheduleController = new JobScheduleController(
    createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
  // 在 ZK 上注册任务
  JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
  // 添加任务信息并进行节点选举
  schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
  // 启动调度器
  jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

 

JobRegistry 是Elastic Job的本地注册表,一堆Map存储一些任务相关的

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobRegistry {
    
    private static volatile JobRegistry instance;
    
    // 调度控制器
    private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();
    
    //注册中心
    private Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap<>();
    
    // 任务实例
    private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap<>();
    
    private Map<String, Boolean> jobRunningMap = new ConcurrentHashMap<>();
    
    private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();

 

/**
     * 获取作业注册表实例.
     * 
     * @return 作业注册表实例
     */
    public static JobRegistry getInstance() {
        if (null == instance) {
            synchronized (JobRegistry.class) {
                if (null == instance) {
                    instance = new JobRegistry();
                }
            }
        }
        return instance;
    }

获取作业注册表实例 使用了单例模式(双重校验锁)

 

JobScheduleController 实际上是对Quartz的封装,底层还是Quartz对作业调度

作业调度控制器

@RequiredArgsConstructor
public final class JobScheduleController {
    
    private final Scheduler scheduler;
    
    private final JobDetail jobDetail;
    
    private final String triggerIdentity;
    
    /**
     * 调度作业.
     * 
     * @param cron CRON表达式
     */
    public void scheduleJob(final String cron) {
        try {
            if (!scheduler.checkExists(jobDetail.getKey())) {
                scheduler.scheduleJob(jobDetail, createTrigger(cron));
            }
            scheduler.start();
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    
    // 。。。
}

 

看下 init里另一个比较核心的方法 registerStartUpInfo

/**
* 注册作业启动信息.
* 
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
  // 启动所有的监听器、监听器用于监听 ZK 节点信息的变化。
  listenerManager.startAllListeners();
  // 节点选举
  leaderService.electLeader();
  // 服务信息持久化(写到 ZK)
  serverService.persistOnline(enabled);
  // 实例信息持久化(写到 ZK)
  instanceService.persistOnline();
  // 重新分片
  shardingService.setReshardingFlag();
  // 监控信息监听器
  monitorService.listen();
  // 自诊断修复,使本地节点与 ZK 数据一致
  if (!reconcileService.isRunning()) {
    reconcileService.startAsync();
  }
}

 

然后我们关注下一个任务调度到底是怎么执行的,入口在哪里

private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }

 

JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();

可以看到这里传递了LiteJob

public final class LiteJob implements Job {
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

 

每次任务调度都会走到Quartz的调度线程

org.quartz.core.QuartzSchedulerThread

org.quartz.core.JobRunShell#initialize

 

public void initialize(QuartzScheduler sched)
        throws SchedulerException {
        this.qs = sched;

        Job job = null;
        JobDetail jobDetail = firedTriggerBundle.getJobDetail();

        try {
            job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
        } catch (SchedulerException se) {
            sched.notifySchedulerListenersError(
                    "An error occured instantiating job to be executed. job= '"
                            + jobDetail.getKey() + "'", se);
            throw se;
        } catch (Throwable ncdfe) { // such as NoClassDefFoundError
            SchedulerException se = new SchedulerException(
                    "Problem instantiating class '"
                            + jobDetail.getJobClass().getName() + "' - ", ncdfe);
            sched.notifySchedulerListenersError(
                    "An error occured instantiating job to be executed. job= '"
                            + jobDetail.getKey() + "'", se);
            throw se;
        }

        this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
    }

 

调用 sched.getJobFactory().newJob(firedTriggerBundle, scheduler);

 

public class PropertySettingJobFactory extends SimpleJobFactory {
    private boolean warnIfNotFound = false;
    private boolean throwIfNotFound = false;
    
    @Override
    public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {

        Job job = super.newJob(bundle, scheduler);
        
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.putAll(scheduler.getContext());
        jobDataMap.putAll(bundle.getJobDetail().getJobDataMap());
        jobDataMap.putAll(bundle.getTrigger().getJobDataMap());

        setBeanProps(job, jobDataMap);
        
        return job;
    }

 

public Job newJob(TriggerFiredBundle bundle, Scheduler Scheduler) throws SchedulerException {

        JobDetail jobDetail = bundle.getJobDetail();
        Class<? extends Job> jobClass = jobDetail.getJobClass();
        try {
            if(log.isDebugEnabled()) {
                log.debug(
                    "Producing instance of Job '" + jobDetail.getKey() + 
                    "', class=" + jobClass.getName());
            }
            
            return jobClass.newInstance();
        } catch (Exception e) {
            SchedulerException se = new SchedulerException(
                    "Problem instantiating class '"
                            + jobDetail.getJobClass().getName() + "'", e);
            throw se;
        }
    }

这里会基于 jobClass 创建Job实例, 就是LiteJob实例

 

真正调度是起work线程执行 org.quartz.core.JobRunShell#run

在 org.quartz.core.JobRunShell#run 中会调用  job.execute(jec)

然后调用 com.dangdang.ddframe.job.lite.internal.schedule.LiteJob#execute,好了,我们核心关注点就可以放在这里了

 

public final class LiteJob implements Job {
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

 

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobExecutorFactory {
    
    /**
     * 获取作业执行器.
     *
     * @param elasticJob 分布式弹性作业
     * @param jobFacade 作业内部服务门面服务
     * @return 作业执行器
     */
    @SuppressWarnings("unchecked")
    public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
        if (null == elasticJob) {
            return new ScriptJobExecutor(jobFacade);
        }
        if (elasticJob instanceof SimpleJob) {
            return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
        }
        if (elasticJob instanceof DataflowJob) {
            return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
        }
        throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
    }
}

 

这里使用了工厂模式

我们写的 Job是 SimpleJob

会走到 SimpleJobExecutor

 

public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
    
    private final SimpleJob simpleJob;
    
    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
    
    @Override
    protected void process(final ShardingContext shardingContext) {
        simpleJob.execute(shardingContext);
    }
}

 

看下 com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#execute()

然后调用本类另一个私有方法  com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#execute(com.dangdang.ddframe.job.executor.ShardingContexts, com.dangdang.ddframe.job.event.type.JobExecutionEvent.ExecutionSource)

com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#process(com.dangdang.ddframe.job.executor.ShardingContexts, com.dangdang.ddframe.job.event.type.JobExecutionEvent.ExecutionSource)

 

分片参数ShardingContexts 会一路传递,最后会调用到

private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobExecutionEvent(startEvent);
        }
        log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
        JobExecutionEvent completeEvent;
        try {
            process(new ShardingContext(shardingContexts, item));
            completeEvent = startEvent.executionSuccess();
            log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobExecutionEvent(completeEvent);
            }
            // CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            // CHECKSTYLE:ON
            completeEvent = startEvent.executionFailure(cause);
            jobFacade.postJobExecutionEvent(completeEvent);
            itemErrorMessages.put(item, ExceptionUtil.transform(cause));
            jobExceptionHandler.handleException(jobName, cause);
        }
    }

 

然后会调用

    protected abstract void process(ShardingContext shardingContext);

 

这里用了模版方法模式,具体的process 方法子类实现

看到子类 SimpleJobExecutor 的 process 实际就是执行了业务Job的 execute方法

public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
    
    private final SimpleJob simpleJob;
    
    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
    
    @Override
    protected void process(final ShardingContext shardingContext) {
        simpleJob.execute(shardingContext);
    }
}

 

@Component
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        System.out.println(
                String.format("分片项 ShardingItem: %s | 运行时间: %s | 线程ID: %s | 分片参数: %s | 任务参数: %s ",
                        context.getShardingItem(),
                        new SimpleDateFormat("HH:mm:ss").format(new Date()),
                        Thread.currentThread().getId(),
                        context.getShardingParameter(),
                        context.getJobParameter())
        );
    }
}

 

Job执行的流程就串完了,看下分片参数怎么分配的呢?

看下 com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#execute() 里对分片参数的获取

ShardingContexts shardingContexts = jobFacade.getShardingContexts();

 

@Override
    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }

 

看下 shardingIfNecessary

/**
     * 如果需要分片且当前节点为主节点, 则作业分片.
     * 
     * <p>
     * 如果当前无可用节点则不分片.
     * </p>
     */
    public void shardingIfNecessary() {
        // 获取有效的任务实例
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {
            // 等待分片分配完成
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherJobCompleted();
        LiteJobConfiguration liteJobConfig = configService.load(false);
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        resetShardingInfo(shardingTotalCount);

        // 获取分片分配策略
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());

        // 存储分片分配结果
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

 

计算sharding分配  这里使用了策略工厂模式

JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass())

 

/**
     * 获取作业分片策略实例.
     * 
     * @param jobShardingStrategyClassName 作业分片策略类名
     * @return 作业分片策略实例
     */
    public static JobShardingStrategy getStrategy(final String jobShardingStrategyClassName) {
        if (Strings.isNullOrEmpty(jobShardingStrategyClassName)) {
            return new AverageAllocationJobShardingStrategy();
        }
        try {
            Class<?> jobShardingStrategyClass = Class.forName(jobShardingStrategyClassName);
            if (!JobShardingStrategy.class.isAssignableFrom(jobShardingStrategyClass)) {
                throw new JobConfigurationException("Class '%s' is not job strategy class", jobShardingStrategyClassName);
            }
            return (JobShardingStrategy) jobShardingStrategyClass.newInstance();
        } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException ex) {
            throw new JobConfigurationException("Sharding strategy class '%s' config error, message details are '%s'", jobShardingStrategyClassName, ex.getMessage());
        }
    }

 

jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)

分片参数提供了3种

 

/**
 * 基于平均分配算法的分片策略.
 * 
 * <p>
 * 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.
 * 如: 
 * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
 * 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
 * 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
 * </p>
 * 
 * @author zhangliang
 */
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
/**
 * 根据作业名的哈希值奇偶数决定IP升降序算法的分片策略.
 * 
 * <p>
 * 作业名的哈希值为奇数则IP升序.
 * 作业名的哈希值为偶数则IP降序.
 * 用于不同的作业平均分配负载至不同的服务器.
 * 如: 
 * 1. 如果有3台服务器, 分成2片, 作业名称的哈希值为奇数, 则每台服务器分到的分片是: 1=[0], 2=[1], 3=[].
 * 2. 如果有3台服务器, 分成2片, 作业名称的哈希值为偶数, 则每台服务器分到的分片是: 3=[0], 2=[1], 1=[].
 * </p>
 * 
 * @author zhangliang
 */
public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
/**
 * 根据作业名的哈希值对服务器列表进行轮转的分片策略.
 * 
 * @author weishubin
 */
public final class RotateServerByNameJobShardingStrategy implements JobShardingStrategy {

 

会告诉当前的分片参数,每个实例分配的具体参数,这样就就可以达到任务多实例并行处理自己分片的逻辑啦~

分片的分配也会存储到ZK。

 

Elastic Job中还大量使用了观察者模式

像任务Event的发布和订阅,使用Google封装的EventBus

比如在任务运行时,会发布任务状态的事件

if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        }
/**
 * 运行痕迹事件总线.
 * 
 * @author zhangliang
 * @author caohao
 */
@Slf4j
public final class JobEventBus {
    
    private final JobEventConfiguration jobEventConfig;
    
    private final ExecutorServiceObject executorServiceObject;
    
    private final EventBus eventBus;
    
    private boolean isRegistered;
    
    public JobEventBus() {
        jobEventConfig = null;
        executorServiceObject = null;
        eventBus = null;
    }
    
    public JobEventBus(final JobEventConfiguration jobEventConfig) {
        this.jobEventConfig = jobEventConfig;
        executorServiceObject = new ExecutorServiceObject("job-event", Runtime.getRuntime().availableProcessors() * 2);
        eventBus = new AsyncEventBus(executorServiceObject.createExecutorService());
        register();
    }
    
    private void register() {
        try {
            eventBus.register(jobEventConfig.createJobEventListener());
            isRegistered = true;
        } catch (final JobEventListenerConfigurationException ex) {
            log.error("Elastic job: create JobEventListener failure, error is: ", ex);
        }
    }
    
    /**
     * 发布事件.
     *
     * @param event 作业事件
     */
    public void post(final JobEvent event) {
        if (isRegistered && !executorServiceObject.isShutdown()) {
            eventBus.post(event);
        }
    }
}

 

public interface JobEventListener extends JobEventIdentity {
    
    /**
     * 作业执行事件监听执行.
     *
     * @param jobExecutionEvent 作业执行事件
     */
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobExecutionEvent jobExecutionEvent);
    
    /**
     * 作业状态痕迹事件监听执行.
     *
     * @param jobStatusTraceEvent 作业状态痕迹事件
     */
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobStatusTraceEvent jobStatusTraceEvent);
}

 

@Subscribe 注解对应的方法参数,就是要监听的事件

 

还有在任务启动过程中,会利用ZK的watcher机制,往ZK注册很多监听器

/**
     * 开启所有监听器.
     */
    public void startAllListeners() {
        electionListenerManager.start();
        shardingListenerManager.start();
        failoverListenerManager.start();
        monitorExecutionListenerManager.start();
        shutdownListenerManager.start();
        triggerListenerManager.start();
        rescheduleListenerManager.start();
        guaranteeListenerManager.start();
        jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
    }

我们主要看下 failoverListenerManager

看看elastic job 如何处理失效转移的

实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。

 

基于ZK curator 添加了2监听器

JobCrashedJobListener 监听是不是有节点挂掉

class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {

                // 获取挂掉的任务实例ID
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                
                // 获取挂掉节点的 failover item
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    // 最开始 failover item是空的,获取任务实例之前分配的item
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }

 

setCrashedFailoverFlag

往zk 写入失效的分片item {jobName}/leader/failover/items/{item} , 标明item需要其他的任务实例给分走 =-=

 

failoverIfNecessary

开始执行失效转移

 /**
     * 如果需要失效转移, 则执行作业失效转移.
     */
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }
    
    private boolean needFailover() {
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }

 

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
                return;
            }
            
            // 获得 ${JOB_NAME}/leader/failover/items/${ITEM_ID} 作业分片项
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);

            // 设置 ${JOB_NAME}/sharding/${ITEM_ID}/failover 作业分片项为 当前拉取实效的作业节点(把失效的作业拉过来自己做 因为有分布式锁,所以只有一个实例拉取成功)
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            
            // 移除 ${JOB_NAME}/leader/failover/items/${ITEM_ID} 作业分片项
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO 不应使用triggerJob, 而是使用executor统一调度
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) {
                jobScheduleController.triggerJob();
            }
        }
    }

 

ojbk,这里其他任务实例存储了需要实效转移的item,正常的分配开始执行item, 依然是在执行任务获取分片信息时

在作业执行流程中获取分片信息的时候,如果开启了故障转移,本次作业的执行,会去优先执行故障转移到当前节点的分片任务。

失效转移这块代码是真的恶心。。。

 

 

转载请注明:汪明鑫的个人博客 » Elastic Job 原理

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz