Skip to content

任务调度框架

任务调度框架有哪些呢?

这四个都是常用的任务调度框架,但各有侧重,适用于不同的场景。让我们分别来看一下它们的优缺点:

1. XXL-JOB:

  • 优点:
    • 轻量级: 相对其他框架,XXL-JOB 更轻量级,部署简单,学习成本低。
    • 功能丰富: 提供了丰富的功能,例如任务监控、报警、失败重试、任务分组等。
    • 可视化管理界面: 提供友好的可视化界面,方便管理和监控任务。
    • 支持多种编程语言: 除了Java,也支持其他语言,如Python、Go等。
    • 分布式: 支持分布式部署,提高任务处理能力。
  • 缺点:
    • 单点问题: 虽然支持分布式,但调度中心仍然存在单点问题,如果调度中心宕机,会影响所有任务的执行。
    • 功能扩展性: 一些高级功能可能需要自定义开发。

2. SchedulerX:

  • 优点:
    • 阿里巴巴出品: 源于阿里巴巴内部的调度系统,经过大量实践检验,稳定可靠。
    • 高可用性: 采用分布式架构,高可用性强,避免单点故障。
    • 功能强大: 提供强大的任务管理、监控、报警等功能。
    • 完善的生态: 与阿里巴巴的其他产品生态集成良好。
  • 缺点:
    • 学习成本较高: 相对XXL-JOB,学习成本较高,需要理解其复杂的架构。
    • 依赖性较强: 需要依赖阿里巴巴的生态系统。
    • 非开源: 部分功能可能需要付费使用。

3. Quartz:

  • 优点:
    • 成熟稳定: 一个久经考验的调度框架,非常稳定可靠。
    • 灵活配置: 配置灵活,可以满足各种不同的调度需求。
    • 丰富的插件: 支持丰富的插件扩展,可以方便地扩展功能。
    • 社区活跃: 拥有庞大的社区支持,遇到问题更容易找到解决方案。
  • 缺点:
    • 单机模式: 默认是单机模式,需要自行实现分布式部署。
    • 配置较复杂: 相对于XXL-JOB,配置较为复杂。

4. ElasticJob:

  • 优点:
    • 分布式: 天生支持分布式部署,可以轻松地扩展任务处理能力。
    • 数据一致性: 提供多种数据一致性保证机制。
    • 多种调度模式: 支持多种调度模式,例如简单、数据流等。
    • 易于集成: 易于集成到Spring等框架中。
  • 缺点:
    • 学习成本较高: 需要理解其分布式架构和多种调度模式。
    • 配置复杂: 配置相对复杂,需要一定的经验。

总结:

特性XXL-JOBSchedulerXQuartzElasticJob
轻量级
功能丰富
可视化界面
分布式支持支持需要自行实现支持
易用性
开源性开源部分开源开源开源
成熟度

选择哪个框架取决于你的具体需求。

  • 对于小型项目或快速原型开发,XXL-JOB 是一个不错的选择。 它轻量级,易于使用,功能也比较完善。
  • 对于需要高可用性和高性能的企业级应用,SchedulerX 或 ElasticJob 是更好的选择。 它们支持分布式部署,具有更好的可靠性和扩展性。
  • 对于需要高度灵活性和自定义功能的应用,Quartz 是一个不错的选择。 它提供了丰富的插件和扩展机制。

记住,没有最好的框架,只有最合适的框架。 你需要根据你的项目需求和团队的技术能力做出选择。

基于 Java 的国外分布式任务调度框架有哪些呢?

除了前面提到的 ElasticJob(虽然是国内开发,但广泛应用于国际),还有其他一些基于 Java 的国外分布式任务调度框架,虽然知名度可能不如 ElasticJob 或 XXL-JOB 高,但它们也各有特色:

  • Spring Batch: 虽然主要用于批处理,但它也可以被用来构建分布式任务调度系统。 通过结合 Spring Cloud 等技术,可以实现分布式协调和任务管理。 Spring Batch 的优势在于其强大的事务管理和可靠性保证,适合处理对数据一致性要求很高的任务。 然而,它的学习曲线相对陡峭,配置也相对复杂,对于简单的调度任务可能显得过于重量级。

  • Apache Hadoop YARN: YARN 本身不是一个任务调度框架,而是一个资源管理框架。但是,它可以结合其他调度器(例如 Apache Oozie)来实现分布式任务调度。 YARN 的优势在于其强大的资源管理能力,可以更好地利用集群资源。 然而,YARN 的配置和使用较为复杂,需要一定的 Hadoop 知识。

  • Apache Oozie: 这是一个基于 Hadoop 的工作流调度系统,可以用来调度 MapReduce、Hive、Pig 等 Hadoop 作业。 它能够创建工作流,定义任务的依赖关系,以及处理任务失败等情况。 Oozie 适合处理复杂的工作流,但它主要用于 Hadoop 生态系统,如果你的任务不依赖于 Hadoop,则可能显得过于繁琐。

  • Apache Airflow: 虽然 Airflow 不是纯 Java 框架 (使用 Python),但它提供了丰富的 Java API,可以与 Java 应用集成。 Airflow 以其强大的工作流管理和可视化界面而闻名,可以创建和管理复杂的 DAG (有向无环图),用于定义任务的依赖关系。 它非常适合处理复杂的、数据依赖性较高的任务。

选择建议:

选择哪个框架取决于你的具体需求和项目背景:

  • 简单的任务调度,轻量级需求: 如果你的任务相对简单,且不需要复杂的依赖关系和工作流管理,那么可以考虑自己动手构建一个简单的基于 Java 的分布式任务调度系统,或者使用一些轻量级的开源组件。

  • 复杂的批处理任务,高可靠性要求: Spring Batch 是一个不错的选择,它提供强大的事务管理和可靠性保证。

  • 基于 Hadoop 的任务调度: Apache Oozie 是一个不错的选择,它可以很好地集成到 Hadoop 生态系统中。

  • 复杂的、数据依赖性较高的任务调度,需要强大的工作流管理: Apache Airflow (即使它主要用Python) 是一个非常强大的选择,其可视化界面和强大的功能非常适合处理复杂的任务。

  • 需要高度灵活和可扩展的调度方案: 需要自行评估和设计方案,并可能需要结合其他的中间件和技术。

记住,没有完美的框架,选择适合你项目需求的框架才是最重要的。 你需要权衡各个框架的优缺点,并根据你的团队的技术栈和经验做出最终决定。 同时,建议进行 PoC (Proof of Concept),尝试使用不同的框架,并评估其性能和适用性。

为什么需要分布式任务调度框架呢?

需要分布式任务调度框架主要是因为单机任务调度无法满足现代应用日益增长的需求。 单机任务调度存在以下几个关键的局限性:

  1. 可扩展性差: 单机任务调度只能在一个机器上运行,处理能力有限。当任务数量增加或任务执行时间变长时,单机系统很容易成为瓶颈,导致任务积压或延迟。 分布式任务调度框架可以将任务分散到多个机器上执行,显著提高系统的处理能力和吞吐量,实现水平扩展。

  2. 可靠性低: 单机系统一旦出现故障,所有正在运行的任务都会中断,导致数据丢失或任务失败。 分布式任务调度框架通常具有高可用性设计,即使部分机器出现故障,也能保证任务的正常执行,提高系统的可靠性。 通过冗余和容错机制,可以确保任务的完成。

  3. 资源利用率低: 单机任务调度无法充分利用多核CPU、多台服务器等硬件资源。 分布式任务调度框架可以将任务分配到不同的机器和CPU核上,提高资源利用率,降低成本。

  4. 难以维护: 单机任务调度系统的维护和管理比较复杂,特别是当任务数量较多时,维护工作量会急剧增加。 分布式任务调度框架通常提供友好的监控和管理界面,方便用户监控任务执行状态、管理任务和排查问题。

  5. 复杂任务协调困难: 对于复杂的、相互依赖的任务,单机调度难以有效地协调任务执行顺序和依赖关系。 分布式任务调度框架提供了更完善的机制来管理任务之间的依赖关系,确保任务按照正确的顺序执行,避免因为依赖关系错乱而导致任务失败。

分布式任务调度框架带来的好处:

  • 更高的吞吐量: 处理更多任务,更快地完成任务。
  • 更高的可靠性: 任务执行更稳定,减少数据丢失和任务失败。
  • 更好的资源利用率: 充分利用硬件资源,降低成本。
  • 更易于维护: 简化维护和管理工作。
  • 更好的可扩展性: 更容易扩展系统容量,以应对不断增长的任务数量。
  • 更强的容错能力: 系统即使部分故障,也能继续运行。

总而言之,分布式任务调度框架是应对现代应用高并发、高可靠性需求的必然选择。 它通过将任务分布到多个机器上执行,极大提高了系统的吞吐量、可靠性、可扩展性和资源利用率,使得大型复杂系统的维护和管理更加容易。

Spring Task

介绍

Spring Task 提供了一种方便的方式来在 Spring 应用程序中创建和管理定时任务。它基于 @Scheduled 注解,简化了定时任务的配置和管理,不需要额外的依赖,直接在 Spring Boot 项目中就能使用。 相比于 Quartz,Spring Task 更轻量级,适合简单的定时任务场景。

以下是如何使用 Spring Task 创建定时任务:

1. 依赖管理:

Spring Task 是 Spring Boot 的一部分,不需要额外添加依赖。 如果你使用的是 Spring Boot,那么你已经拥有了 Spring Task 的所有必要组件。

2. 创建定时任务类:

创建一个类,实现你的定时任务逻辑。 使用 @Component 注解将该类标记为 Spring Bean,并使用 @Scheduled 注解指定任务的执行时间。

java
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MyScheduledTask {

    @Scheduled(fixedRate = 5000) // 每 5 秒执行一次
    public void myTask() {
        System.out.println("定时任务执行时间: " + new java.util.Date());
        // 你的任务逻辑
    }
}

3. @Scheduled 注解的属性:

@Scheduled 注解有多种属性,可以控制任务的调度方式:

  • fixedRate: 以固定的时间间隔执行任务。 单位为毫秒。
  • fixedDelay: 在任务执行完成后,延迟指定的时间后再次执行任务。 单位为毫秒。
  • initialDelay: 第一次执行任务前的延迟时间。 单位为毫秒。
  • cron: 使用 Cron 表达式指定任务的执行时间。 这是最灵活的方式,可以指定各种复杂的调度规则。 例如:0 0 10 * * ? (每天上午 10 点执行)。

例子:

  • 每 5 秒执行一次:

    java
    @Scheduled(fixedRate = 5000)
    public void myTask() { ... }
  • 每 10 秒执行一次,第一次延迟 3 秒执行:

    java
    @Scheduled(fixedRate = 10000, initialDelay = 3000)
    public void myTask() { ... }
  • 在每天上午 10 点执行:

    java
    @Scheduled(cron = "0 0 10 * * ?")
    public void myTask() { ... }
  • 每隔 5 秒执行一次,如果上次执行出现异常则延迟执行:

    java
    @Scheduled(fixedRate = 5000)
    public void myTask() {
        try {
            // 你的任务逻辑
        } catch (Exception e) {
            System.err.println("任务执行出错: " + e.getMessage());
            // 可以在此处添加错误处理逻辑,例如发送告警邮件
        }
    }

4. 启用定时任务:

在你的 Spring Boot 应用主类上添加 @EnableScheduling 注解,启用 Spring 的定时任务调度功能:

java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

5. Cron 表达式:

Cron 表达式是一个强大的工具,可以用来定义各种复杂的调度规则。 它由 6 个字段组成,分别表示:秒、分、时、日、月、周。 每个字段可以使用数字、星号 (*)、问号 (?)、逗号 (,)、短横线 (-) 等符号来表示不同的含义。 你可以参考 Cron 表达式规范文档来学习如何编写 Cron 表达式。 有很多在线工具可以帮助你生成 Cron 表达式。

Spring Task 的优缺点:

优点:

  • 简单易用: 使用 @Scheduled 注解即可轻松定义定时任务,无需复杂的配置。
  • 轻量级: 不需要额外的依赖,适合简单的定时任务场景。
  • 集成方便: 直接集成到 Spring 框架中。

缺点:

  • 功能相对简单: 相比于 Quartz 等专业定时任务框架,Spring Task 的功能相对简单,不适合复杂的调度需求。
  • 缺乏分布式协调: Spring Task 本身不具备分布式协调能力,如果需要在多个服务器上运行同一个定时任务,需要自行实现分布式协调机制。

选择 Spring Task 还是 Quartz 取决于你的项目需求。 对于简单的定时任务,Spring Task 足够胜任;对于复杂的调度需求,或者需要分布式协调的场景,建议使用 Quartz 或其他更强大的定时任务框架。

SpringBoot 项目启用 Spring Task

详细配置请参考https://gitee.com/dexterleslie/demonstration/tree/master/demo-spring-boot/demo-spring-boot-task

启用Spring Task

java
/**
 * Disable @EnableScheduling on Spring Tests
 * https://stackoverflow.com/questions/29014496/disable-enablescheduling-on-spring-tests
 */
@ConditionalOnProperty(
        value = "app.scheduling.enable", havingValue = "true", matchIfMissing = true
)
@Configuration
@EnableScheduling
public class ConfigTask {
    // 配置spring scheduling核心执行线程,spring默认执行线程数为1
    // https://stackoverflow.com/questions/29796651/what-is-the-default-scheduler-pool-size-in-spring-boot
    /*@Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(5);
        return scheduler;
    }*/
}

配置spring scheduling执行线程数

配置spring scheduling核心执行线程,spring默认执行线程数为1

application.properties 配置如下:

java
# https://blog.csdn.net/qq_35067322/article/details/103982304
spring.task.scheduling.pool.size=5

Cron表达式

Cron Trigger Tutorialhttps://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html

详细用法请参考https://gitee.com/dexterleslie/demonstration/tree/master/demo-spring-boot/demo-spring-boot-task

  • 每秒执行一次

    java
    // 每秒触发一次
    @Scheduled(cron = "0/1 * * * * ?")
    public void doCronbEverySecond() {
        Date date = new Date();
        System.out.println("每秒触发一次:" + date);
    }
  • 每小时的每分钟的第1秒触发一次

    java
    // 每小时的每分钟的第1秒触发一次
    @Scheduled(cron = "1 * * * * ?")
    public void triggerOnceAtFirstSecondEveryMinute() {
        Date date = new Date();
        System.out.println("每小时的每分钟的第1秒触发一次:" + date);
    }
  • 每隔5秒触发一次,0/5 表示从0秒开始

    java
    // 每隔5秒触发一次,0/5 表示从0秒开始
    @Scheduled(cron = "0/5 * * * * ?")
    public void cronbTask() {
        Date date = new Date();
        System.out.println("每5秒触发一次:" + date);
    }
  • 每天13:06:03触发一次

    java
    // 每天13:06:03触发一次
    @Scheduled(cron = "3 6 13 * * ?")
    public void triggerTest1() {
        Date date = new Date();
        System.out.println("每天13:06:03触发一次:" + date);
    }
  • 每天13点每2分钟触发

    java
    // 每天13点每2分钟触发
    @Scheduled(cron = "0 0/2 13 * * ?")
    public void doCronbStartAt10Every2Minute() {
        Date date = new Date();
        System.out.println("每天13点每2分钟触发:" + date);
    }
  • 每分钟触发一次

    java
    // 每分钟触发一次
    @Scheduled(cron = "0 0/1 * * * ?")
    public void doCronbEveryMinute() {
        Date date = new Date();
        System.out.println("每分钟触发一次:" + date);
    }

xxl-job

介绍

XXL-JOB 是一个轻量级、分布式任务调度框架,由中国开发者开发,但因其易用性和功能丰富而在国际上也有一定的应用。 它解决了传统任务调度框架在分布式环境下的诸多痛点,例如:

XXL-JOB 的核心优势:

  • 轻量级: 部署简单,学习成本低,对资源消耗少。 这使其非常适合小型项目和快速原型开发。

  • 分布式: 支持分布式部署,可以轻松扩展任务处理能力,提高系统的整体性能和可靠性。 这避免了单点故障,任务调度更稳定。

  • 动态配置: 支持动态添加、修改和删除任务,无需重启服务。 这使得任务管理更加灵活方便。

  • 监控管理: 提供友好的可视化管理界面,可以方便地监控任务执行情况,例如任务状态、执行时间、执行日志等。 这大大方便了运维和问题排查。

  • 多种执行器模式: 支持多种执行器模式,例如:

    • BEAN模式: 直接在调度中心执行任务,适合简单的任务。
    • JAVA模式: 通过 Java 代码执行任务,适合复杂的业务逻辑。
    • SDK模式: 提供各种语言的 SDK,方便集成到不同的项目中。 支持多种语言例如:Java、Python、Go 等。
  • 失败重试: 支持任务失败重试,提高任务执行的可靠性。 可以配置重试次数和间隔时间。

  • 任务分组: 支持任务分组,方便管理大量的任务。

XXL-JOB 的架构:

XXL-JOB 主要由调度中心和执行器两部分组成:

  • 调度中心: 负责任务的管理、调度和监控。 它是一个独立的应用,可以部署在独立的服务器上。

  • 执行器: 负责执行任务。 它可以是独立的应用,也可以嵌入到其他的应用中。 多个执行器可以分布式部署,提高任务处理能力。

XXL-JOB 的缺点:

  • 单点问题 (调度中心): 虽然支持分布式执行器,但调度中心仍然存在单点问题。 如果调度中心宕机,会影响所有任务的调度。 虽然可以通过高可用方案(例如集群部署)来解决这个问题,但这增加了部署和管理的复杂性。

  • 功能扩展性: 一些高级功能可能需要自定义开发。

XXL-JOB 的适用场景:

  • 需要轻量级、易于部署的任务调度系统。
  • 需要分布式任务调度能力,提高任务处理效率和可靠性。
  • 需要方便的监控和管理功能。
  • 需要支持多种编程语言的执行器。

总结:

XXL-JOB 是一个非常优秀的开源任务调度框架,具有易用性强、功能丰富、性能优良等优点,适合各种规模的项目。 但也要注意其单点问题,需要根据实际情况选择合适的部署方案。 它是一个非常值得推荐的任务调度方案,特别适合那些希望快速构建一个可靠、高效的分布式任务调度系统的团队。

原理

XXL-JOB 的核心原理在于其基于中心化调度和分布式执行的架构设计。它通过一个中央调度中心来管理和调度所有任务,并将任务分配到多个分布式执行器上执行,从而实现高可用性和高性能。 具体来说,其工作原理可以分解为以下几个步骤:

1. 任务注册:

  • 开发者编写任务代码,并将其注册到 XXL-JOB 调度中心。 注册过程中,需要指定任务的名称、执行器、定时策略等信息。 XXL-JOB 支持多种编程语言(Java、Python、Go等),开发者可以根据需要选择合适的语言来编写任务代码。

2. 任务调度:

  • 调度中心根据预设的调度策略(例如 cron 表达式)定期检查需要执行的任务。 调度策略可以是固定的时间间隔,也可以是基于 cron 表达式的灵活调度。

3. 任务分配:

  • 调度中心会将待执行的任务分配给可用的执行器。 分配策略可以是轮询、权重等,以保证任务负载均衡。 调度中心会记录任务的执行状态,跟踪任务的执行进度。

4. 任务执行:

  • 执行器接收任务分配请求后,执行相应的任务代码。 执行器会将任务的执行结果反馈给调度中心。 任务执行过程中,执行器会记录日志信息,方便调试和排错。 执行器可以运行在不同的服务器上,从而实现分布式执行。

5. 结果反馈与监控:

  • 执行器将任务执行结果(成功或失败)以及日志信息反馈给调度中心。 调度中心会根据反馈结果更新任务状态,并进行相应的处理(例如重试机制)。 调度中心会监控任务的执行情况,并提供可视化的监控界面,方便用户查看任务执行状态、日志信息等。 如果任务失败,调度中心可以根据配置进行重试,也可以发送报警通知。

关键技术:

  • 中心化调度: 调度中心是系统的核心,负责所有任务的管理、调度和监控。 这简化了任务管理,也便于监控和管理。

  • 分布式执行: 任务执行分散到多个执行器上,提高了系统性能和可靠性。 执行器与调度中心之间通过网络通信进行交互。

  • 数据存储: XXL-JOB 使用数据库来存储任务信息、执行日志等数据。 这确保了数据持久性和可靠性。

  • 任务状态管理: 调度中心会跟踪每个任务的执行状态,并根据状态进行相应的处理。

  • 触发机制: 调度中心采用多种触发机制,可以根据需要触发任务执行。 例如,定时触发、手动触发、API 触发。

  • 容错机制: XXL-JOB 具有容错机制,可以处理执行器故障、网络故障等异常情况。 例如,任务失败重试、执行器心跳检测等。

总结:

XXL-JOB 的核心在于其中心化调度和分布式执行的架构设计,以及高效的任务管理、监控和容错机制。 它简化了分布式任务调度的复杂性,提高了系统的性能和可靠性,同时也提供了一个友好的用户界面,方便用户管理和监控任务。 然而,其调度中心仍然是单点,需要考虑高可用性解决方案来进一步增强系统的可靠性。

部署调度中心

https://cloud.tencent.com/developer/article/2334353

复制 xxl-job 数据库初始化脚本 https://github.com/xuxueli/xxl-job/blob/2.3.1/doc/db/tables_xxl_job.sql

xxl-job-db.sql 如下:

sql
#
# XXL-JOB v2.3.1
# Copyright (c) 2015-present, xuxueli.

CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;

SET NAMES utf8mb4;

CREATE TABLE `xxl_job_info` (
                                `id` int(11) NOT NULL AUTO_INCREMENT,
                                `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
                                `job_desc` varchar(255) NOT NULL,
                                `add_time` datetime DEFAULT NULL,
                                `update_time` datetime DEFAULT NULL,
                                `author` varchar(64) DEFAULT NULL COMMENT '作者',
                                `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
                                `schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',
                                `schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',
                                `misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',
                                `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
                                `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
                                `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
                                `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
                                `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
                                `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
                                `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
                                `glue_source` mediumtext COMMENT 'GLUE源代码',
                                `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
                                `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
                                `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
                                `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',
                                `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
                                `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
                                PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_log` (
                               `id` bigint(20) NOT NULL AUTO_INCREMENT,
                               `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
                               `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
                               `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
                               `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
                               `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
                               `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
                               `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
                               `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
                               `trigger_code` int(11) NOT NULL COMMENT '调度-结果',
                               `trigger_msg` text COMMENT '调度-日志',
                               `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
                               `handle_code` int(11) NOT NULL COMMENT '执行-状态',
                               `handle_msg` text COMMENT '执行-日志',
                               `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
                               PRIMARY KEY (`id`),
                               KEY `I_trigger_time` (`trigger_time`),
                               KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_log_report` (
                                      `id` int(11) NOT NULL AUTO_INCREMENT,
                                      `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
                                      `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
                                      `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
                                      `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
                                      `update_time` datetime DEFAULT NULL,
                                      PRIMARY KEY (`id`),
                                      UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_logglue` (
                                   `id` int(11) NOT NULL AUTO_INCREMENT,
                                   `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
                                   `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
                                   `glue_source` mediumtext COMMENT 'GLUE源代码',
                                   `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
                                   `add_time` datetime DEFAULT NULL,
                                   `update_time` datetime DEFAULT NULL,
                                   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_registry` (
                                    `id` int(11) NOT NULL AUTO_INCREMENT,
                                    `registry_group` varchar(50) NOT NULL,
                                    `registry_key` varchar(255) NOT NULL,
                                    `registry_value` varchar(255) NOT NULL,
                                    `update_time` datetime DEFAULT NULL,
                                    PRIMARY KEY (`id`),
                                    KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_group` (
                                 `id` int(11) NOT NULL AUTO_INCREMENT,
                                 `app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
                                 `title` varchar(12) NOT NULL COMMENT '执行器名称',
                                 `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
                                 `address_list` text COMMENT '执行器地址列表,多地址逗号分隔',
                                 `update_time` datetime DEFAULT NULL,
                                 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_user` (
                                `id` int(11) NOT NULL AUTO_INCREMENT,
                                `username` varchar(50) NOT NULL COMMENT '账号',
                                `password` varchar(50) NOT NULL COMMENT '密码',
                                `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
                                `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
                                PRIMARY KEY (`id`),
                                UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_lock` (
                                `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
                                PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' );
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');

commit;

my-customize.cnf 内容如下:

properties
[mysqld]
slow_query_log=1
long_query_time=1
slow_query_log_file=slow-query.log
innodb_flush_log_at_trx_commit=0
max_allowed_packet=10m
key_buffer_size=512m
innodb_log_file_size=512m
innodb_log_buffer_size=256m
innodb_file_per_table=1
max_binlog_size=512m

# 一旦提供log_bin参数无论是何值或者不提供值时,表示启用binlog功能
# 不提供log_bin表示禁用binlog功能
log_bin
expire_logs_days=10
binlog_format=mixed
max_binlog_size=1024m
# 指定binlog文件的前缀
log_basename=master1
server_id=10001

docker-compose.yaml 内容如下:

yaml
version: "3.3"

services:
  xxl-job-admin:
    image: xuxueli/xxl-job-admin:2.3.1
#    ports:
#      - "8040:8080"
    network_mode: host
    environment:
      PARAMS: '
          --spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
          --spring.datasource.username=root
          --spring.datasource.password=123456
          --xxl.job.accessToken=xxl-job'
    depends_on:
      - db

  db:
    image: mariadb:10.4.19
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_general_ci
      - --skip-character-set-client-handshake
      # 设置innodb-buffer-pool-size
      # https://stackoverflow.com/questions/64825998/how-to-change-the-default-config-for-mysql-when-using-docker-image
      - --innodb-buffer-pool-size=1g
    volumes:
      - ./my-customize.cnf:/etc/mysql/conf.d/my-customize.cnf:ro
      - ./xxl-job-db.sql:/docker-entrypoint-initdb.d/db.sql:ro
    environment:
      # 解决mysql cli中文乱码问题
      # https://blog.csdn.net/qq_44766883/article/details/128065916
      - LANG=C.UTF-8
      - TZ=Asia/Shanghai
      - MYSQL_ROOT_PASSWORD=123456
    network_mode: host

启动用 xxl-job-admin (调度中心)服务

bash
docker compose up -d

访问调度中心服务 http://localhost:8080/xxl-job-admin/,帐号:admin,密码:123456

部署执行器

参考官方示例配置执行器 https://github.com/xuxueli/xxl-job/blob/2.3.1/xxl-job-executor-samples/xxl-job-executor-sample-springboot

部署执行器本质是配置 SpringBoot 项目集成 xxl-job。

配置 pom 引用 xxl-job 依赖

xml
<!-- xxl-job 依赖 -->
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.1</version>
</dependency>

application.properties 配置

properties
logging.level.com.future.demo=debug

# no web
#spring.main.web-environment=false

# log config
#logging.config=classpath:logback.xml

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=xxl-job

### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=${user.home}/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

XxlJobConfig 配置

java
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


}

启动 SpringBoot 应用后访问 xxl-job 控制台 http://localhost:8080/xxl-job-admin/jobgroup 可以查看到名为 xxl-job-executor-sample 记录的OnLine 机器地址列有信息表示执行器成功部署。

创建任务并执行

任务代码:

java
/**
 * XxlJob开发示例(Bean模式)
 * <p>
 * 开发步骤:
 * 1、任务开发:在Spring Bean实例中,开发Job方法;
 * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
 * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
 * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Component
@Slf4j
public class SampleXxlJob {

    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");
        log.debug("XXL-JOB, Hello World.");
        // default success
    }

}

重启 SpringBoot 应用后登录 xxl-job 控制台 http://localhost:8080/xxl-job-admin/jobinfo 新增 job,job 信息如下:

  • 执行器选择示例执行器
  • 负责人填写xxl-job
  • 任务描述填写xxl-job
  • 调度类型选择CRON
  • Cron 表达式使用编辑器编辑到每5秒执行
  • 运行模式选择BEAN
  • JobHandler 填写 demoJobHandler
  • 路由策略选择第一个

点击保存按钮新增 job 后,选中以上创建的 job 点击功能操作>执行一次以触发 job 的一次运行,点击功能操作>启动以启用 job 自动执行。

任务参数

java
@XxlJob("paramJobHandler")
public void paramJobHandler() {
    String jobParam = XxlJobHelper.getJobParam();
    log.debug("jobParam {}", jobParam);
}

GLUE 模式

介绍

XXL-JOB的GLUE模式是一种非常灵活的任务配置方式,它允许开发者以源码的形式在调度中心维护任务,并支持实时编译,无需指定JobHandler。以下是对XXL-JOB GLUE模式的详细介绍及举例:

一、GLUE模式概述

  1. 特点
    • 任务以源码方式维护在调度中心。
    • 支持多种编程语言(如Java、Shell、Python等),但通常使用Java。
    • 可以使用@Resource/@Autowire注入执行器中的其他服务。
  2. 适用场景
    • 需要快速部署和测试定时任务。
    • 不想因为修改定时任务而重新发布整个应用。

二、GLUE模式使用步骤(以Java为例)

  1. 在调度中心新建任务
    • 登录XXL-JOB调度中心。
    • 进入任务管理页面,点击“新增任务”。
    • 设置任务的基本信息,如任务描述、负责人等。
    • 在“运行模式”中选择“GLUE模式(Java)”。
    • 设置任务的触发配置,如调度类型、CRON表达式等。
  2. 编辑GLUE脚本
    • 打开新建任务的GLUE IDE界面。
    • 在IDE中编写Java代码,实现定时任务的功能。例如,可以注入一个Service并调用其中的方法。
    • 保存并编译代码。
  3. 启动任务
    • 在任务管理页面,启动刚才创建的任务。
    • 观察执行器的控制台输出,确认任务是否按预期执行。

三、举例

假设有一个Spring Boot项目,其中有一个HelloService类,包含两个方法methodAmethodB,分别打印不同的信息。现在希望通过XXL-JOB的GLUE模式,定时调用methodA方法。

  1. 在Spring Boot项目中定义HelloService
java
@Service
public class HelloService {
    public void methodA() {
        System.out.println("执行MethodA的方法");
    }
 
    public void methodB() {
        System.out.println("执行MethodB的方法");
    }
}
  1. 在调度中心新建GLUE任务
    • 登录XXL-JOB调度中心。
    • 进入任务管理页面,点击“新增任务”。
    • 设置任务信息,选择运行模式为“GLUE模式(Java)”。
    • 在GLUE IDE中编写如下代码:
java
package com.xxl.job.service.handler;
 
import cn.wolfcode.xxljobdemo.service.HelloService;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.beans.factory.annotation.Autowired;
 
public class DemoGlueJobHandler extends IJobHandler {
    @Autowired
    private HelloService helloService;
 
    @Override
    public void execute() throws Exception {
        helloService.methodA();
    }
}

注意:这里的包名和类名需要根据实际情况调整,确保能够正确注入HelloService

  1. 保存并启动任务
    • 保存GLUE脚本并编译。
    • 在任务管理页面启动任务。
    • 观察Spring Boot项目的控制台输出,应该可以看到“执行MethodA的方法”被定时打印出来。

四、负载均衡策略

在集群环境中,为了确保定时任务不会重复执行,XXL-JOB提供了多种路由策略。常见的路由策略包括:

  • FIRST:固定选择第一个机器。
  • LAST:固定选择最后一个机器。
  • ROUND:轮询选择在线的机器。
  • RANDOM:随机选择在线的机器。
  • CONSISTENT_HASH:每个任务按照Hash算法固定选择某一台机器。

根据实际需求选择合适的路由策略即可。如果需要修改路由策略,需要停止任务并重新启动。

综上所述,XXL-JOB的GLUE模式提供了一种灵活且高效的定时任务配置方式。通过在线编辑源码并实时编译,开发者可以快速部署和测试定时任务,而无需重新发布整个应用。

实验

创建业务 service HelloService

java
package com.future.demo.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class HelloService {
    public void methodA() {
        log.debug("执行MethodA的方法");
    }

    public void methodB() {
        log.debug("执行MethodB的方法");
    }
}

登录 xxl-job 控制台 http://localhost:8080/xxl-job-admin/jobinfo 新增一个 Glue 类型的 job,job 信息如下:

  • 执行器选择示例执行器
  • 负责人填写xxl-job
  • 任务描述填写xxl-job
  • 调度类型选择CRON
  • Cron 表达式使用编辑器编辑到每5秒执行
  • 运行模式选择GLUE(Java)
  • 路由策略选择第一个

点击保存按钮新增 job 后,选中以上创建的 job 点击功能操作>GLUE IDE以编辑 job 代码。

创建 Glue Java 代码调用业务 service HelloService

java
package com.xxl.job.service.handler;

import com.future.demo.service.HelloService;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.beans.factory.annotation.Autowired;
 
public class DemoGlueJobHandler extends IJobHandler {
    @Autowired
    private HelloService helloService;
 
    @Override
    public void execute() throws Exception {
        helloService.methodA();
    }
}

选中以上创建的 job 点击功能操作>执行一次以调试 job。

执行器集群

路由策略

XXL-JOB提供了多种路由策略,用于在分布式任务调度中决定任务如何在不同的执行器之间进行分配。以下是XXL-JOB的主要路由策略:

  1. FIRST(第一个)
    • 固定选择第一个机器。当执行器集群部署时,框架会选择第一个注册的执行器作为任务执行的目标机器。
    • 这种策略适用于执行器数量较少且集群部署较为稳定的情况。
  2. LAST(最后一个)
    • 固定选择最后一个机器。与FIRST策略相反,LAST策略会始终选择最后注册的执行器作为任务执行的目标机器。
    • 这种策略适用于执行器数量较多且集群部署不稳定的情况。
  3. ROUND(轮询)
    • 按照执行器注册地址轮询分配任务。当有多个执行器可用时,框架会按照轮询的方式依次选择执行器执行任务。
    • 这种策略可以保证任务的均匀分配,提高系统的稳定性和可靠性。
  4. RANDOM(随机)
    • 随机选择在线的机器。框架会随机选择一个在线的执行器执行任务。
    • 这种策略适用于任务执行时间较短且对执行顺序无要求的情况。
  5. CONSISTENT_HASH(一致性HASH)
    • 每个任务按照Hash算法固定选择某一台机器。这种策略通过一致性HASH算法将每个任务固定映射到一台执行器上,确保任务的均匀分布和负载均衡。
    • 当一台执行器出现故障时,框架会自动调整任务的分配,将故障机器上的任务重新分配给健康的执行器。
  6. LEAST_FREQUENTLY_USED(最不经常使用LFU)
    • 优先选择使用频率最低的那台机器。这种策略通过记录每个执行器的使用频率,优先选择使用频率最低的机器执行任务,以实现负载均衡和资源的有效利用。
  7. LEAST_RECENTLY_USED(最近最少使用LRU)
    • 选择最近最少被使用的机器进行任务分配。
  8. FAILOVER(故障转移)
    • 如果任务在某个执行器上执行失败,会尝试在其他执行器上执行。这种策略提高了任务的可靠性,确保在出现故障时能够尽快恢复执行。
  9. BUSYOVER(忙碌转移)
    • 如果任务在某个执行器上因为忙碌(如CPU使用率过高)而无法及时执行,会尝试将任务转移到其他空闲的执行器上执行。这种策略有助于优化资源利用和任务执行效率。
  10. SHARDING_BROADCAST(分片广播)
    • 将任务分片,然后在所有执行器上广播执行。这种策略适用于需要大规模并行处理的任务场景,可以显著提高任务处理的速度和效率。

这些路由策略可以根据实际的业务需求和执行器的部署情况来选择,以实现高效的任务调度和负载均衡。在配置路由策略时,可以在XXL-JOB的调度中心对任务进行配置,选择适合的路由策略来优化任务的执行。

实验

启用两个执行器应用,第二个 SpringBoot VM Options 添加 -Dserver.port=18081 -Dxxl.job.executor.port=10000 配置后启动。

登录 xxl-job 控制台 http://localhost:8080/xxl-job-admin/jobgroup 查看 OnLine 机器地址 列显示两个执行器。

登录 xxl-job 控制台 http://localhost:8080/xxl-job-admin/jobinfo 修改 job 路由策略轮询

观察各个执行器控制台是否交替输出日志。

分片广播

XXL-JOB是一个分布式任务调度平台,它提供了分片广播这一功能,以下是对XXL-JOB分片广播的详细介绍:

一、分片广播的定义

分片广播是XXL-JOB的一种路由策略,它允许调度中心将任务广播到集群中的所有执行器,同时系统自动传递分片参数。每个执行器都会接收到任务并执行,根据分片参数处理相应的数据分片。这种方式类似于消息队列(MQ)的广播模式,适用于需要在多个执行器上同时执行相同任务的场景,如数据同步或分布式计算等。

二、分片广播的实现原理

  1. 任务配置与分发
    • 在XXL-JOB的调度中心,用户通过Web界面创建一个分片广播类型的任务,并设置相应的参数,如分片总数(shardingTotalCount)。
  2. 分片参数传递
    • 每个执行器在接收到广播的任务时,会自动获得分片参数,包括分片总数和当前执行器应该处理的分片序号(shardingItem)。这些参数由XXL-JOB框架自动注入,使得执行器能够知道它应当处理哪个数据分片。
  3. 分片逻辑执行
    • 实际的分片逻辑需要在执行器的任务处理器代码中实现。开发者需根据分片序号和总数,决定处理哪些数据。这通常涉及对数据源的分片访问,如数据库查询时使用分页查询或者ID取模等方法来确定每个执行器处理的数据范围。
  4. 结果汇总
    • 由于是广播任务,每个执行器处理的是全量数据的一个子集,因此不存在汇总操作。每个执行器独立完成自己的处理逻辑。如果需要最终汇总结果,需要额外的逻辑来收集和整合各个执行器的输出。

三、分片广播的使用场景

分片广播适用于以下场景:

  1. 数据同步:需要将数据从一个系统同步到另一个系统,且数据量较大时,可以使用分片广播将任务分发到多个执行器上并行处理。
  2. 分布式计算:需要进行大规模的计算任务,且计算过程可以拆分为多个子任务并行执行时,可以使用分片广播将任务分发到多个执行器上进行处理。
  3. 批量处理:需要处理大量的数据记录,且每条记录的处理过程相互独立时,可以使用分片广播将任务分发到多个执行器上进行批量处理。

四、分片广播的示例代码

以下是一个使用XXL-JOB进行分片广播的示例代码:

java
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import java.util.List;
 
@Component
public class ShardingBroadcastJob {
 
    @XxlJob("shardingBroadcastTask")
    public void execute(String param) {
        // 获取分片参数:分片总数和分片序列号
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
 
        // 模拟获取数据列表
        List<String> dataList = getDataList();
 
        // 执行分片逻辑
        shardingExecute(dataList, shardIndex, shardTotal);
    }
 
    public List<String> getDataList() {
        // 这里可以根据实际情况从数据库或其他数据源获取数据列表
        // 为了示例简单,直接返回一个固定的列表
        return List.of("data1", "data2", "data3", "data4", "data5", "data6", "data7", "data8", "data9", "data10");
    }
 
    public void shardingExecute(List<String> dataList, int shardIndex, int shardTotal) {
        // 计算当前分片应处理的数据范围
        int start = (shardIndex * dataList.size()) / shardTotal;
        int end = ((shardIndex + 1) * dataList.size()) / shardTotal;
 
        // 处理当前分片的数据
        for (int i = start; i < end; i++) {
            String data = dataList.get(i);
            // 在此处添加具体的数据处理逻辑
            System.out.println("处理数据: " + data);
        }
    }
}

在上述代码中,ShardingBroadcastJob类定义了一个分片广播任务。在execute方法中,通过XxlJobHelper.getShardIndex()获取当前分片序号,通过XxlJobHelper.getShardTotal()获取总分片数。然后模拟获取了一个数据列表dataList,接下来使用shardingExecute方法执行分片逻辑。

五、注意事项

  1. 任务幂等性:由于分片广播会将任务分发到多个执行器上执行,因此需要确保任务是幂等的,即多次执行同一任务不会导致数据不一致或重复处理。
  2. 执行器负载:在使用分片广播时,需要考虑执行器的负载情况。如果某个执行器负载过高,可能会导致任务执行延迟或失败。因此,需要合理配置执行器数量和资源。
  3. 任务超时:对于执行时间较长的任务,需要设置合理的超时时间,以避免任务因超时而被中断。

综上所述,XXL-JOB的分片广播功能为分布式任务调度提供了灵活而强大的支持。通过合理的配置和使用,可以显著提高任务处理的效率和可靠性。

六、实验

分片广播 job

java
/**
 * 2、分片广播任务
 */
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

    // 分片参数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();

    XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
    log.debug("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
}

创建 job 选择路由策略为 分片广播,点击 执行一次 调试 job 分片总数和分片序列号是否正确。

elstaic-job

注意:初步判断 elastic-job 和 SpringBoot 集成麻烦,暂时不研究。

介绍

Elastic-Job 是一个分布式调度框架,用于解决单机定时任务在高可用性和水平扩展性方面的不足。它支持多种任务类型,并提供丰富的功能来管理和监控分布式任务。 主要有两种版本:ElasticJob-Lite 和 ElasticJob-Cloud。

ElasticJob-Lite:

  • 轻量级: 依赖较少,易于集成到现有项目中。
  • 功能完整: 提供了必要的分布式任务调度功能,例如作业分片、失效转移、任务监控等。
  • 部署灵活: 支持多种部署方式,例如 ZooKeeper 和数据库。
  • 适合小型项目: 对于不需要复杂云端管理功能的项目,ElasticJob-Lite 是一个不错的选择。

ElasticJob-Cloud:

  • 云原生: 基于云原生架构,提供更强大的管理和监控功能。
  • 集中管理: 提供一个中央控制台来管理所有分布式任务。
  • 更高级功能: 例如,任务的动态配置、自动伸缩、告警等。
  • 适合大型项目: 对于需要集中管理和监控大量分布式任务的项目,ElasticJob-Cloud 是一个更好的选择。

核心概念:

  • 作业(Job): 需要定时执行的任务。
  • 作业分片(Job Sharding): 将一个作业拆分成多个分片,分配到不同的机器上执行,提高并行处理能力。
  • 作业服务器(Job Server): 执行作业的机器。
  • 注册中心(Registry Center): 用于存储作业信息和协调作业执行的中心服务器,通常使用 ZooKeeper 或数据库。
  • 失效转移(Failover): 当一个作业服务器出现故障时,将该服务器上的作业分片转移到其他服务器上执行,保证任务的可靠性。

Elastic-Job 的主要功能:

  • 作业分片: 将作业分解成多个分片,并行执行,提高效率。
  • 失效转移: 保证任务的高可用性。
  • 任务监控: 提供监控界面,查看任务的执行状态。
  • 任务管理: 可以方便地管理和控制任务。
  • 数据一致性: 提供机制保证数据的一致性,避免数据冲突。
  • 多种任务类型支持: 包括简单任务、数据流式任务等。

两种版本的对比:

特性ElasticJob-LiteElasticJob-Cloud
架构轻量级,单机部署或简单集群部署云原生,分布式架构
管理方式通过配置或API管理提供中央控制台进行集中管理
功能基本分布式调度功能更高级功能,例如动态配置、自动伸缩
部署复杂度简单较复杂
可扩展性受限于单机或简单集群的性能较好,可以水平扩展
适用场景小型项目大型项目,需要集中管理和监控

选择哪个版本?

  • 对于小型项目,或者对集中管理和高级功能要求不高,可以选择 ElasticJob-Lite。
  • 对于大型项目,或者需要集中管理和监控大量分布式任务,需要高可用性和弹性伸缩能力,则选择 ElasticJob-Cloud。

总结:

Elastic-Job 是一个功能强大的分布式任务调度框架,它可以帮助你轻松地构建和管理分布式定时任务。 选择哪个版本取决于你的具体需求和项目规模。 建议阅读 ElasticJob 的官方文档以获取更多信息。