Java定时器演进过程和生产级分布式任务调度ElasticJob代码实战

发布于 2022年 01月 24日 00:36

为何要使用分布式任务调度

**本人博客网站 **IT小神 www.itxiaoshen.com

演示项目源码地址** https://gitee.com/yongzhebuju/spring-task **

在企业系统开发过程中难免少不了要使用定时任务来进行定时触发执行,对于非分布式环境系统的单一应用来说则非常容易解决,我们只需要在系统中内部集成一些开源的调度库配置定时触发即可;但是随着企业的系统越来越多,逐步从单一应用慢慢演变为微服务,在分布式系统中常见的任务重复执行、任务负载均衡、统一任务调度、任务故障转移、任务监控和报警等一些列的问题都是需要在分布式系统中进行解决的,分布式任务调度则应运而生

Java定时常用方式

基础理论原理

很多人写过基于线程的while+sleep来实现定时任务完成一些定时后台任务,而Jdk原生也有提供定时器实现;一般定时器实现底层有下面几种原理,主要涉及数据结构和算法的应用

  • 小顶堆
    • 堆看作一个数组,也可以被看作一个完全二叉树,通俗来讲堆其实就是利用完全二叉树的结构来维护的一维数组
    • 每个结点的值都小于或等于其左右孩子结点的值
    • t通过建堆和堆化操作,从定时任务上使用每次可以从堆顶取出最近一个需要执行的任务
  • 时间轮算法(顾名思义就是以时间)
    • 链表或数组实现时间轮
      • 通过while true sleep然后遍历数组,每个数组下标放置一个链表,而这个链表放置定时任务,只要遍历到就取出执行
    • round型时间轮
      • 任务记录一个round值,遍历到就减1,为0时取出执行
      • 需要遍历所有任务,效率较低
    • 分层时间轮
      • 使用多个不同维度的时间轮
        • 天轮,记录几点执行
        • 月轮,记录几号执行
      • 当在月轮遍历好了取出放到天轮,以这样方式时间几月几号执行

Jdk Timer

Jdk的timer核心实现

  • 最小堆:queue,存放TimerTask
  • 任务线程:TimerThread,任务执行线程,继承自Thread基类,死循环判断是否有任务需要执行
  • 单线程:执行任务,任务可能会相互阻塞
    • schedule
    • scheduleAtFixedRate

package com.itxs.timer;

import java.util.TimerTask;  

public class MyTimerTask extends TimerTask {

    @Override
    public void run() {
        System.out.println("hello my timer task");
    }
}

package com.itxs.timer;

import org.joda.time.DateTime; //joda-time日期类型库

import java.util.Timer;

public class JdkTimer {
    public static void main(String[] args) {
        Timer timer = new Timer();
        //指定时间点执行
        timer.schedule(new MyTimerTask(),new DateTime(2021,8,26,17,19,30).toDate());
        //延迟两秒执行,然后再每个3秒执行
        timer.schedule(new MyTimerTask(),2000,3000);
    }
}

Jdk定时任务线程池

核心实现也是小顶堆,无界队列,可以使用多线程执行任务,有Leader-Follower模式,避免没必要阻塞和唤醒操作,节省系统资源

package com.itxs.scheduler;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class SchedulerThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        //延迟3秒执行,执行一次
        scheduledExecutorService.schedule(new MySchedulerThreadPoolTask(),3,TimeUnit.SECONDS);
        //延迟3秒执行,之后每隔十秒执行
        scheduledExecutorService.scheduleAtFixedRate(new MySchedulerThreadPoolTask(),3,10,TimeUnit.SECONDS);
    }
}

package com.itxs.scheduler;

public class MySchedulerThreadPoolTask implements Runnable{
    @Override
    public void run() {
        System.out.println("MySchedulerThreadPoolTask");
    }
}

Spring Task

下面我们借助Spring Boot来演示下Spring Task,配置为多线程模式

package com.itxs.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;

import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class ScheduleConfig {

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(10);
        threadPoolTaskScheduler.setThreadNamePrefix("spring-task-thread");
        return threadPoolTaskScheduler;
    }
}
package com.itxs.scheduler;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MySpringTask {

    @Scheduled(cron = "0/30 * * * * ?")
    private void process(){
        System.out.println("MySpringTask:"+Thread.currentThread());
    }
}
package com.itxs;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling //注意需要在Spring Boot启动类上加开启Spring任务的注解
public class SpringTaskApplication {
    public static void main(String[] args) {
        new SpringApplication().run(SpringTaskApplication.class,args);
    }
}

Quartz定时任务框架

定义

Quartz是一个功能丰富的开源作业调度库,可以集成到几乎任何Java应用程序中——从最小的独立应用程序到最大的电子商务系统。Quartz可用于创建简单或复杂的调度,以执行数十个、数百个甚至数万个作业;这些作业的任务被定义为标准Java组件,这些组件实际上可以执行您编程让它们执行的任何事情。

运行环境

  • Quartz可以嵌入到另一个独立的应用程序中运行
  • Quartz可以在应用程序服务器(或servlet容器)中实例化,并参与XA事务
  • Quartz可以作为一个独立的程序(在它自己的Java虚拟机中)运行,通过RMI使用
  • 可以将Quartz实例化为一个独立程序集群(具有负载平衡和故障转移功能),用于执行作业

持久化

  • Quartz的设计包括一个JobStore接口,该接口可以实现为作业的存储提供各种机制。
  • 通过使用包含的Jdbc JobStore,所有配置为“非易失性”的job和触发器都通过JDBC存储在关系数据库中。
  • 使用包含的RAM JobStore,所有的job和触发器都存储在RAM中,因此不会在程序执行之间持久化——但这样做的好处是不需要外部数据库

集群

  • 故障转移
  • 负载平衡
  • Quartz的内置集群特性依赖于通过JDBCJobStore(前面描述过)实现的数据库持久性。
  • Terracotta对Quartz的扩展提供了集群功能,而不需要支持数据库。

使用说明

可以使用 SchedulerFactory 类来达到程序调度的目的,一旦调度器实例化后,它就能够启动,等待执行和关闭。需要注意的是一旦调度器调用 了shutdown 方法关闭后,如果不重新实例化,它就不会启动了。触发器在调度器未启动时,或是终止状态时,都不会被触发

  • Scheduler - 与调度程序交互的主要API。
  • Job - 你想要调度器执行的任务组件需要实现的接口
  • JobDetail - 用于定义作业的实例。
  • Trigger - 触发器,定义执行给定作业的计划的组件。
  • JobBuilder - 用于定义/构建 JobDetail 实例,用于定义作业的实例。
  • TriggerBuilder - 用于定义/构建触发器实例。
  • Scheduler 的生命期 - 从 SchedulerFactory 创建它时开始,到 Scheduler 调用shutdown() 方法时结束;Scheduler 被创建后,可以增加、删除和列举 Job 和 Trigger,以及执行其它与调度相关的操作(如暂停 Trigger)。但是,Scheduler 只有在调用 start() 方法后,才会真正地触发 trigger(即执行 job)

Spring Boot 整合Quartz

我们本篇的主角ElasticJob底层是依赖Quartz实现的,所以我们有必要先简单了解Quartz使用,本篇采用jdbc持久化模式,我们这里选择基于mysql的持久化,因此需要将tables_mysql_innodb.sql包含11张表导入到mysql数据库中

静态配置Quartz任务

pom文件内容如下,大部分都是常见启动器,我们重点是spring-boot-starter-quartz

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itxs</groupId>
    <artifactId>spring-task</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.5.2</version>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <quartz-springboot.version>2.5.2</quartz-springboot.version>
        <lombok.version>1.18.20</lombok.version>
        <druid.version>1.2.6</druid.version>
        <mysql.version>8.0.25</mysql.version>
        <mybatis-plus.version>3.4.0</mybatis-plus.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        //Spring Boot整合Quartz的启动器
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
            <version>${quartz-springboot.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
            <optional>true</optional>
        </dependency>
      
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

    </dependencies>
</project>

application配置文件,profiles激活dev环境,在dev环境中进行数据库配置,include: quartz包含一个单独配置文件,在里面可以配置多个Quantz的任务参数

application.yml

spring:
  application:
    name: itxs-spring-task
  profiles:
    active: dev
    include: quartz
  quartz:
    job-store-type: jdbc # 使用数据库存储
    scheduler-name: cluster_scheduler # 相同 Scheduler 名字的节点,形成一个 Quartz 集群
    wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
    jdbc:
      initialize-schema: never # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。
mybatis-plus:
  #  mapper-locations: classpath:mapper/*.xml
  global-config:
    db-config:
      id-type: auto
      logic-delete-field: deleted
      logic-delete-value: 1
      logic-not-delete-value: 0
  configuration:
    map-underscore-to-camel-case: on
    call-setters-on-nulls: on
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

application-dev.yml

server:
  port: 8080
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.3.117:3306/testdb?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
    username: itxs
    password: itxs@123
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      max-active: 1000
      min-idle: 5
      initial-size: 10

application-quartz.yml

quartz:
  # jobGroup名称一致的情况下,不可出现相同jobName
  jobs[0]:
    jobName: myJob1
    # 以服务名为组名
    jobGroup: myGroup1
    # 业务逻辑处理类的包名
    jobClassName: com.itxs.scheduler.MySpringQuartzOneJob
    # 触发器名称
    triggerName: myTrigger1
    # cron表达式 每30秒执行一次
    cronExpression: 0/30 * * * * ?
    # 任务状态 1 正常 0 暂停
    triggerState: 1
    # 排序
    sort: 1
  jobs[1]:
    jobName: myJob2
    # 以服务名为组名
    jobGroup: myGroup2
    # 业务逻辑处理类的包名
    jobClassName: com.itxs.scheduler.MySpringQuartzSecondJob
    # 触发器名称
    triggerName: myTrigger2
    # cron表达式 每分钟执行一次
    cronExpression: 0 * * * * ?
    # 任务状态 1 正常 0 暂停
    triggerState: 1
    # 排序
    sort: 2

配置类,主要配置schedulerFactoryBean和线程池,初始化quartz的scheduler

package com.itxs.config;

import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.concurrent.Executor;

@Configuration
public class ScheduleConfig {

    @Autowired
    private DataSource dataSource;

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(10);
        threadPoolTaskScheduler.setThreadNamePrefix("spring-task-thread");
        return threadPoolTaskScheduler;
    }

    @Value("${spring.quartz.job-store-type}")
    private String storeType;

    @Bean
    public Scheduler scheduler(){
        return schedulerFactoryBean().getScheduler();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(){
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        schedulerFactoryBean.setSchedulerName("cluster_scheduler");
        if (storeType.equals("jdbc")){
            schedulerFactoryBean.setDataSource(dataSource);
        }
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("application");
        schedulerFactoryBean.setTaskExecutor(schedulerThreadPool());
        schedulerFactoryBean.setStartupDelay(0);
        return schedulerFactoryBean;
    }

    @Bean
    public Executor schedulerThreadPool(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
        threadPoolTaskExecutor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
        return threadPoolTaskExecutor;
    }
}

Quartz实现类,集成QuartzJobBean实现executeInternal的接口

package com.itxs.scheduler;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
import java.util.Date;

/**
 * 运行在spring
 */
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
@Component
public class MySpringQuartzJob extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext){
        try {
            log.info("MySpringQuartzJob------调度实例:{},任务名称:{},执行时间:{}" + jobExecutionContext.getScheduler().getSchedulerInstanceId(),
                    jobExecutionContext.getJobDetail().getKey().getName(),new Date());
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}

接下来是创建监听器并将job启动执行,在Spring 容器刷新后执行监听器,SpringQuartzApplicationListener是在将Quantz Job配置直接写在代码里,而SpringQuartzYamlApplicationListener则是读取application-quartz.yml里面的每个job的配置然后循环创建

package com.itxs.listener;

import com.itxs.scheduler.MySpringQuartzJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringQuartzApplicationListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private Scheduler scheduler;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("SpringQuartzApplicationListener quartz调度任务创建开始-------");
        TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", "group1");
        try {
            Trigger trigger = scheduler.getTrigger(triggerKey);
            System.out.println(scheduler.getSchedulerName());
            if (trigger == null){
                trigger = TriggerBuilder.newTrigger()
                        .withIdentity(triggerKey)
                        .withSchedule(CronScheduleBuilder.cronSchedule("0 * * * * ?"))
                        .startNow()
                        .build();
                JobDetail jobDetail = JobBuilder.newJob(MySpringQuartzJob.class)
                        .withIdentity("job1","group1")
                        .build();
                scheduler.scheduleJob(jobDetail,trigger);
                scheduler.start();
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        log.info("SpringQuartzApplicationListener quartz调度任务创建结束-------");
    }
}
package com.itxs.listener;


import com.itxs.pojo.JobEntity;
import com.itxs.utils.QuartzEnum;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
@ConfigurationProperties(prefix = "quartz")
public class SpringQuartzYamlApplicationListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private Scheduler scheduler;

    List<JobEntity> jobs = new ArrayList<>();

    public List<JobEntity> getJobs() {
        return jobs;
    }

    public void setJobs(List<JobEntity> jobs) {
        this.jobs = jobs;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("SpringQuartzYamlApplicationListener quartz调度任务创建开始-------");
        for(JobEntity entity : jobs) {
            log.info("调度任务-------"+jobs.toString());
            // 当定时任务状态为0时,不启动
            if (entity.getTriggerState() == QuartzEnum.PAUSED.getCode()) {
                continue;
            }

            try {
                Class<? extends Job> jobClass = (Class<? extends Job>) (Class.forName(entity.getJobClassName()).newInstance().getClass());
                if (jobClass != null){
                    TriggerKey triggerKey = TriggerKey.triggerKey(entity.getTriggerName(), entity.getJobGroup());
                    Trigger trigger = scheduler.getTrigger(triggerKey);
                    if (trigger == null){
                        trigger = TriggerBuilder.newTrigger()
                                .withIdentity(triggerKey)
                                .withSchedule(CronScheduleBuilder.cronSchedule(entity.getCronExpression()))
                                .startNow()
                                .build();

                        Map<String,Object> map = new HashMap<>();
                        map.put("objectName","object");
                        JobDataMap jobDataMap = new JobDataMap(map);

                        JobDetail jobDetail = JobBuilder.newJob(jobClass)
                                .usingJobData(jobDataMap)
                                .withIdentity(entity.getJobName(), entity.getJobGroup())
                                .build();
                        scheduler.scheduleJob(jobDetail,trigger);
                    }
                }
            } catch (SchedulerException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
                e.printStackTrace();
            }
        }

        try {
            scheduler.start();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        log.info("SpringQuartzYamlApplicationListener quartz调度任务创建结束-------");
    }
}

调用的Json参数为

{
    "cron": "0/3 * * * * ?",
    "beginTime": "2021-08-27",
    "clazzName": "com.itxs.scheduler.RemindJob",
    "jobGroup": "mygroup",
    "jobName": "myjob",
    "parmas": "elastic job dynamic hello world"
}

启动Spring Boot启动类,日志显示quartz使用db持久化方式,所有的job实现类也按照配置参数定时执行,并写持久化到mysql数据库里quartz表里

动态配置Quantz

如果我们需要针对定时任务进行创建、停止等操作,那么我们需要动态操作Quantz,本篇也基于Spring Boot + Quartz封装任务调度实现了动态管理

详细代码可以到gitee项目源码里获取

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-72e5t825-1630077809479)(file://F:\creation\markdown\article\ElasticJob%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%BA%94%E7%94%A8\image-20210827150653182.png?lastModify=1630077763)]

ElasticJob-分布式任务调度

其他框架名词了解

开源的分布式任务或作业调度框架除了我们本篇当当的ElasticJob,还有大众点评开发人员许雪里的XXL-JOB、唯品会Saturn、淘宝的TBSchedule和SchedulerX,此外另外一个在大数据批处理作业调度器Azkaban也非常出名,

XXL-JOB:是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。

TBSchedule:一个简洁的分布式任务调度引擎,基于ZooKeeper纯Java实现,由Alibaba开源。

SchedulerX:Alibaba Cloud SchedulerX精确、高可靠、高可用的调度任务服务,响应时间在秒内,SchedulerX(分布式任务调度) 是隶属于阿里云EDAS产品的组件, Spring Cloud AliCloud SchedulerX 提供了在Spring Cloud的配置规范下,分布式任务调度的功能支持。SchedulerX可提供秒级、精准、高可靠、高可用的定时任务调度服务,并支持多种类型的任务调度,如简单单机任务、简单多机任务、脚本任务以及网格任务。

Saturn:来自唯品会开发的一个分布式、容错和高可用的作业调度服务。

此外,这里也提一下Azkaban,Linkedin开源的一个批量工作流调度器,实现可以一个工作流内,多个作业可以按照特定的顺序执行,作业之间的顺序关系依靠key-value的形式来建立依赖关系,并提供可视化界面编制作业的工作流程。

由于我们基于Java技术大都以SpringBoot开发为主,ElasticJob与Spring整合也相当不错,且ElasticJob子项目ElasticJob-Lite定位为轻量级无中心化解决方案,所以本篇我们主要推荐使用ElasticJob分布式任务调度框架

概述

shardingsphere官方网站** ,ShardingSphere 已于2020年4月16日成为 Apache 软件基金会的顶级项目**

ElasticJob官方网站** ElasticJob作为Apache ShardingSphere的子项目**

Apache ShardingSphere 是一套开源的分布式数据库解决方案组成的生态圈,它由 JDBC、Proxy 和 Sidecar(规划中)这 3 款既能够独立部署,又支持混合部署配合使用的产品组成。 它们均提供标准化的数据水平扩展、分布式事务和分布式治理等功能,可适用于如 Java 同构、异构语言、云原生等各种多样化的应用场景。

Apache ShardingSphere 旨在充分合理地在分布式的场景下利用关系型数据库的计算和存储能力,而并非实现一个全新的关系型数据库。 关系型数据库当今依然占有巨大市场份额,是企业核心系统的基石,未来也难于撼动,我们更加注重在原有基础上提供增量,而非颠覆。

Apache ShardingSphere 5.x 版本开始致力于可插拔架构,项目的功能组件能够灵活的以可插拔的方式进行扩展。 目前,数据分片、读写分离、数据加密、影子库压测等功能,以及 MySQL、PostgreSQL、SQLServer、Oracle 等 SQL 与协议的支持,均通过插件的方式织入项目。 开发者能够像使用积木一样定制属于自己的独特系统。Apache ShardingSphere 目前已提供数十个 SPI 作为系统的扩展点,仍在不断增加中。

  • ElasticJob 是一个面向互联网生态和海量任务的分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。
  • ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务;
  • ElasticJob-Cloud 使用 Mesos 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
  • ElasticJob 通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。
  • ElasticJob 的各个产品使用统一的作业 API,开发者仅需要一次开发,即可随意部署
  • 使用 ElasticJob 能够让开发工程师不再担心任务的线性吞吐量提升等非功能需求,使他们能够更加专注于面向业务编码设计; 同时,它也能够解放运维工程师,使他们不必再担心任务的可用性和相关管理需求,只通过轻松的增加服务节点即可达到自动化运维的目的。
  • ElasticJob实现分布式特性主要依赖于Zookeeper,比如leader选举、弹性扩缩容、故障转移、负载均衡等机制

可以通过快速入门快速体验ElasticJob

架构

ElasticJob-Lite

定位为轻量级无中心化解决方案,使用 jar 的形式提供分布式任务的协调服务。

ElasticJob-Cloud

采用自研 Mesos Framework 的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。

特性 ElasticJob-Lite ElasticJob-Cloud
无中心化
资源分配 不支持 支持
作业模式 常驻 常驻 + 瞬时
部署依赖 ZooKeeper ZooKeeper + Mesos

功能列表

  • 弹性调度
    • 支持任务在分布式场景下的分片和高可用
    • 能够水平扩展任务的吞吐量和执行效率
    • 任务处理能力随资源配备弹性伸缩
  • 资源分配
    • 在适合的时间将适合的资源分配给任务并使其生效
    • 相同任务聚合至相同的执行器统一处理
    • 动态调配追加资源至新分配的任务
  • 作业治理
    • 失效转移
    • 错过作业重新执行
    • 自诊断修复
  • 作业依赖(TODO)
    • 基于有向无环图(DAG)的作业间依赖
    • 基于有向无环图(DAG)的作业分片间依赖
  • 作业开放生态
    • 可扩展的作业类型统一接口
    • 丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
    • 易于对接业务作业,能够与 Spring 依赖注入无缝整合
  • 可视化管控端
    • 作业管控端
    • 作业执行历史数据追踪
    • 注册中心管理

基本使用

作业类型

静态任务配置

前面我们学习Spring Boot 整合Quartz的使用,ElasticJob使用可所谓简单至极,还是原来我们说的Spring Boot的三板斧,加依赖和配置,另外增加实现类Ok搞掂。由于我们还用使用之前工程项目,因此依赖和配置内容较多,核心是添加elasticjob-lite-spring-boot-starter和elasticjob项配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itxs</groupId>
    <artifactId>spring-task</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.5.2</version>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <quartz-springboot.version>2.5.2</quartz-springboot.version>
        <lombok.version>1.18.20</lombok.version>
        <druid.version>1.2.6</druid.version>
        <mysql.version>8.0.25</mysql.version>
        <mybatis-plus.version>3.4.0</mybatis-plus.version>
        <elasticjob-lite-core.version>3.0.0-RC1</elasticjob-lite-core.version>
        <curator.version>5.2.0</curator.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
            <version>${quartz-springboot.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-core</artifactId>
            <version>${elasticjob-lite-core.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
            <version>${elasticjob-lite-core.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>${curator.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>${curator.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
        </dependency>

    </dependencies>
</project>
spring:
  application:
    name: itxs-spring-task
  profiles:
    active: dev
    include: quartz
  quartz:
    job-store-type: jdbc # 使用数据库存储
    scheduler-name: cluster_scheduler # 相同 Scheduler 名字的节点,形成一个 Quartz 集群
    wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
    jdbc:
      initialize-schema: never # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。
elasticjob:
  reg-center:
    server-lists: 192.168.50.201:2181,192.168.50.204:2182,192.168.50.153:2183
    namespace: itxs-elastic-job
  jobs:
    elasticDemoOneJob:
      elasticJobClass: com.itxs.scheduler.ElasticDemoOneJob
      cron: 0/30 * * * * ?
      shardingTotalCount: 1
      shardingItemParameters: 0=Beijing
mybatis-plus:
  #  mapper-locations: classpath:mapper/*.xml
  global-config:
    db-config:
      id-type: auto
      logic-delete-field: deleted
      logic-delete-value: 1
      logic-not-delete-value: 0
  configuration:
    map-underscore-to-camel-case: on
    call-setters-on-nulls: on
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

ElasticJob简单实现类示例

package com.itxs.scheduler;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

/**
 * 运行在spring
 */

@Slf4j
@Component
@Scope("prototype")
public class ElasticDemoOneJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("ElasticDemoOneJob Start ------jobname={},taskid={},parameter={},shardingitem={},shardingparameter={}",shardingContext.getJobName(),
                shardingContext.getTaskId(),shardingContext.getJobParameter(),
                shardingContext.getShardingItem(),shardingContext.getShardingParameter());
    }
}

搞掂运行,出现我们ElasticDemoOneJob调度日志,我们再启动一个8081端口,也即是同时有多个进程实现调度任务,发现目前的Job一直在8080这个进程上运行,当我们关闭8080端口这个SpringBoot程序后,过一会8081端口这个微服务就会ElasticDemoOneJob调度日志执行任务接替原来8080定时任务。注意由于我们job元数据信息是存在zookeeper里面,如果我们没有使用覆盖等级制,重新修改job配置参数后没有生效,建议先删除掉zookeeper的节点数据然后启动再服务

当我们配置分片后,比如配置为5个分配,启动多个进程会将分片负载均衡分配到各个进程任务去支持,比如当前8082则执行0和3两个分片,8080是2分片,8081是1和4两个分片,也即是根据当前可用节点数据针对分片数量进行动态调整,这种场景比较适用于处理任务执行时间较长需要处理的数据较大

动态任务

如果我们需要动态创建启动和停止ElasticJob,我们可以自己实现封装,具体如下

动态创建启用和停止的接口声明类

package com.itxs.service;

import com.itxs.pojo.SysTask;

/**
 * @Description 任务管理
 * @Version 1.0
 */
public interface IElasticJobService {

    /**
     * @Description //添加一个任务
     * @Param [sysTask]
     * @return boolean
     */
    boolean addJob(SysTask sysTask) throws Exception;

    /**
     * @Description //删除某个任务
     * @Param [sysTask]
     * @return boolean
     */
    boolean deleteJob(SysTask sysTask)  throws Exception;
}

接口实现类:

package com.itxs.service.impl;

import com.itxs.pojo.SysTask;
import com.itxs.service.IElasticJobService;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class ElasticJobServiceImpl implements IElasticJobService {

    @Value("${elasticjob.reg-center.server-lists}")
    private String serverList;
    @Value("${elasticjob.reg-center.namespace}")
    private String namespace;

    @Override
    public boolean addJob(SysTask sysTask){
        try {
            Class jobClass = Class.forName(sysTask.getClazzName());
            JobConfiguration jobConfig = JobConfiguration.newBuilder(sysTask.getJobName(),1)
                    .cron(sysTask.getCron()).overwrite(true).jobParameter(sysTask.getParmas()).disabled(false).build();
            ScheduleJobBootstrap scheduleJobBootstrap = new ScheduleJobBootstrap(createRegistryCenter(), (SimpleJob)jobClass.newInstance(), jobConfig);
            scheduleJobBootstrap.schedule();
            return true;
        }catch (Exception e){
            return false;
        }
    }

    @Override
    public boolean deleteJob(SysTask sysTask){
        try {
            Class jobClass = Class.forName(sysTask.getClazzName());
            JobConfiguration jobConfig = JobConfiguration.newBuilder(sysTask.getJobName(),1)
                    .cron(sysTask.getCron()).overwrite(true).jobParameter(sysTask.getParmas()).disabled(true).build();
            ScheduleJobBootstrap scheduleJobBootstrap = new ScheduleJobBootstrap(createRegistryCenter(), (SimpleJob)jobClass.newInstance(), jobConfig);
            scheduleJobBootstrap.schedule();
            return true;
        }catch (Exception e){
            return false;
        }
    }

    public CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        regCenter.init();
        return regCenter;
    }
}

在原来controller上增加addElasticTask和deleteElasticTask两个方法

package com.itxs.controller;


import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.itxs.pojo.SysTask;
import com.itxs.service.IElasticJobService;
import com.itxs.service.IJobManageService;
import com.itxs.utils.JsonResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


/**
 * @Description 任务管理控制,此处只实现了增加和删除,
 * @Version 1.0
 */
@Slf4j
@RestController
@RequestMapping("/quartz")
public class SysTaskController {

    @Autowired
    private IJobManageService jobManageService;

    @Autowired
    private IElasticJobService iElasticJobService;

    private boolean validateParmas(SysTask task) {
        return task != null && StringUtils.isNotBlank(task.getCron())
                && StringUtils.isNotBlank(task.getJobGroup())
                && StringUtils.isNotBlank(task.getJobName());
    }

    @GetMapping("/add")
    public JsonResult addTask() {
        return JsonResult.success();
    }

    /**
     * @Description //添加一个任务
     * @Param [task]
     * @return com.quartz.result.JsonResult
     */
    @PostMapping("/add-task")
    public JsonResult addTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.jobManageService.addJob(task);
                return JsonResult.success();
            } catch (Exception e) {
                log.error("添加任务异常,异常任务名称:" + task.getJobGroup() + "; 任务名称" + task.getJobName());
            }
        }
        return JsonResult.error();
    }

    @PostMapping("/add-elastic-task")
    public JsonResult addElasticTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.iElasticJobService.addJob(task);
                return JsonResult.success();
            } catch (Exception e) {
                log.error("添加任务异常,异常任务名称:" + task.getJobGroup() + "; 任务名称" + task.getJobName());
            }
        }
        return JsonResult.error();
    }

    /**
     * @Description //删除一个任务
     * @Param [task]
     * @return com.quartz.result.JsonResult
     */
    @RequestMapping("/delete-elastic-task")
    public JsonResult deleteElasticTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.iElasticJobService.deleteJob(task);

                return JsonResult.success();
            } catch (Exception e) {
                log.error("删除任务异常,异常任务名称:" + task.getJobGroup() + "; 任务名称" + task.getJobName());
            }
        }
        return JsonResult.error();
    }

    /**
     * @Description //删除一个任务
     * @Param [task]
     * @return com.quartz.result.JsonResult
     */
    @RequestMapping("/delete-task")
    public JsonResult deleteTask(@RequestBody SysTask task) {
        if (validateParmas(task)) {
            try {
                this.jobManageService.deleteJob(task);

                return JsonResult.success();
            } catch (Exception e) {
                log.error("删除任务异常,异常任务名称:" + task.getJobGroup() + "; 任务名称" + task.getJobName());
            }
        }
        return JsonResult.error();
    }
}

重新启动服务,端口为8080,post调用地址:http://192.168.3.224:8080/quartz/add-elastic-task,调用添加接口成功任务开始定时执行,查看zookeeper上也已经存储了新创建Job元数据信息,当我们调用删除任务接口后定时任务不再执行

{
    "cron": "0/5 * * * * ?",
    "beginTime": "2021-08-27",
    "clazzName": "com.itxs.scheduler.ElasticDemoJob",
    "jobGroup": "myelasticgroup",
    "jobName": "myelasticjob",
    "parmas": "real elastic job dynamic hello world"
}

有耐心看在这里的小伙伴们,恭喜你,已经入门了分布式任务调度

推荐文章