文章

Redisson 延时队列实现分布式定时任务

在现代分布式系统中,定时任务的实现往往需要考虑高可用、可扩展性和一致性等问题。本文将介绍如何使用 Redisson 的延时队列来实现一个分布式定时任务系统,以解决传统定时任务在分布式环境下面临的挑战。

背景

在我们的物流系统中,需要定期分析线路延迟情况并划分延迟原因。这个过程涉及到实时交通数据的获取、车辆轨迹的分析以及大量的计算工作。传统的单机定时任务无法满足高并发和故障转移的需求,因此我们选择使用 Redisson 的延时队列来实现一个分布式的定时任务系统。

Redisson 延时队列简介

Redisson 是一个基于 Redis 的分布式服务框架,它提供了许多分布式原语和工具,其中包括延时队列(Delayed Queue)。Redisson 的延时队列具有以下特点:

  1. 分布式:可以在多个节点间共享和处理任务。
  2. 可靠性:基于 Redis 的持久化机制,保证任务不会丢失。
  3. 精确的调度:可以精确到毫秒级的任务调度。
  4. 动态伸缩:可以动态增加或减少工作节点。

实现架构

我们的分布式定时任务系统主要包含以下几个组件:

  1. RedisDelayQueueProducer:负责向延时队列中添加任务。
  2. RedisDelayedQueueInit:初始化队列监听器,负责启动消费者线程。
  3. RedisDelayedQueueListener:定义了任务处理的接口。
  4. Job:实现了 RedisDelayedQueueListener 接口,包含具体的业务逻辑。

核心代码解析

1. 延时队列生产者

这个类负责将任务添加到延时队列中。它使用 Redisson 的 RDelayedQueue 来实现延时功能。

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisDelayQueueProducer {
    private static final StringCodec stringCodec = new StringCodec(StandardCharsets.UTF_8);
    private final RedissonClient redissonClient;

    /**
     * 添加延迟队列
     */
    public <T> boolean addDelayQueue(@NonNull T t, long delay, @NonNull TimeUnit timeUnit, @NonNull String queueName) {
        if (StringUtils.isBlank(queueName)) {
            return false;
        }
        try {
            // 获取阻塞队列
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName, stringCodec);
            // 获取延时队列
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            // 转换为 JSON 字符串并添加到队列
            String value = JSON.toJSONString(t);
            delayedQueue.offer(value, delay, timeUnit);
        } catch (Exception e) {
            log.error("【添加延时队列失败】, 队列名:{}, {}", queueName, e.getMessage());
            throw new RuntimeException("(添加延时队列失败)");}
        return true;
    }

    /**
     * 获取一个延时阻塞队列
     */
    public RBlockingQueue<String> getDelayedBlockingQueue(String queueName, long delay, TimeUnit timeUnit){RBlockingQueue<String> blockingFairQueue = redissonClient.getBlockingQueue(queueName, stringCodec);
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer("", delay, timeUnit);
        return blockingFairQueue;
    }
}

2. 队列监听器接口

这个接口定义了任务处理的方法和获取队列枚举的方法。

public interface RedisDelayedQueueListener {
    /**
     * 处理消息
     */
    void invoke(String msg) throws Exception;

    /**
     * 获取队列枚举
     */
    RedisDelayQueueEnum getRedisDelayQueueEnum();}

3. 队列初始化器

这个类负责初始化和启动所有的队列监听器。它在应用启动时自动运行,为每个 RedisDelayedQueueListener 创建一个独立的线程来监听队列。

@RequiredArgsConstructor
@Slf4j
@Component
@Order(10)
public class RedisDelayedQueueInit implements ApplicationContextAware, CommandLineRunner {
    
    // 存储任务 Future 的 Map
    private final Map<RedisDelayQueueEnum, Future<Void>> futureTaskMap = new HashMap<>();
    
    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
    private final RedisDelayQueueProducer redisDelayQueueProducer;
    // 存储所有监听器的 Map
    private Map<String, RedisDelayedQueueListener> taskListeners;

    /**
     * 获取应用上下文并获取相应的接口实现类
     */
    @Override
    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        this.taskListeners = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
    }

    /**
     * 启动监听器
     */
    public void start(String taskFlag) {
        if ("1".equals(taskFlag) && !taskListeners.isEmpty()){taskListeners.values().forEach(listener -> {Future<Void> future = startJob(listener);
                futureTaskMap.put(listener.getRedisDelayQueueEnum(), future);});
        }
    }

    /**
     * 启动单个监听任务
     */
    private Future<Void> startJob(RedisDelayedQueueListener redisDelayedQueueListener) {
        String queueName = redisDelayedQueueListener.getRedisDelayQueueEnum().getQueueName();
        RBlockingQueue<String> blockingFairQueue = redisDelayQueueProducer.getDelayedBlockingQueue(
            queueName, 3, TimeUnit.SECONDS);

        return ThreadUtil.execAsync(() -> {
            log.info("【启动消息队列线程】, 消息队列名:{}", queueName);
            while (true) {
                try {
                    // 获取队列消息
                    String value = blockingFairQueue.take();
                    log.info("消息队列名:{}, 获取到消息:{}", queueName, value);
                    if(StringUtils.isBlank(value)) continue;
                    
                    // 异步处理消息
                    CompletableFuture.runAsync(() -> {
                        try {redisDelayedQueueListener.invoke(value);
                        } catch (Exception e) {
                            log.error("【消息消费失败】 消息队列名:{}, 获取到消息:{}", queueName, value, e);}
                    }, threadPoolTaskExecutor);} catch (InterruptedException ie) {
                    log.error("监听队列中断 queueName={}", queueName);
                    throw ie;
                } catch (Exception e) {
                    log.error("{} 监听队列线程错误,", queueName, e);
                    ThreadUtil.sleep(3000);}
            }
        });}

    @Override
    public void run(String... args) {
        start("1");}
}

4. 任务监听器实现示例

这个类实现了具体的业务逻辑,包括处理任务和重新调度任务。

@Component
@Slf4j
@RequiredArgsConstructor
public class DelayedTaskListener implements RedisDelayedQueueListener {
    
    @Override
    public void invoke(String msg) throws Exception {
        // 1. 解析任务信息
        TaskInfo taskInfo = JSON.parseObject(msg, TaskInfo.class);
        
        try {
            // 2. 执行任务逻辑
            processTask(taskInfo);
            
            // 3. 如果需要重复执行,重新加入队列
            if (taskInfo.isRepeat()) {scheduleNextTask(taskInfo);
            }
        } catch (Exception e) {
            log.error("Task execution failed: {}", e.getMessage(), e);
        }
    }

    @Override
    public RedisDelayQueueEnum getRedisDelayQueueEnum() {
        return RedisDelayQueueEnum.TASK_QUEUE;
    }
}

5. 队列枚举定义

public enum RedisDelayQueueEnum {
    TASK_QUEUE("DELAYED_TASK_QUEUE");
    
    private final String queueName;
    
    RedisDelayQueueEnum(String queueName) {
        this.queueName = queueName;
    }
    
    public String getQueueName() {
        return queueName;
    }
}

工作流程

  1. 系统启动时,RedisDelayedQueueInit 初始化所有的队列监听器。
  2. 当需要创建一个新的定时任务时,调用 RedisDelayQueueProducer.addDelayQueue() 方法将任务添加到延时队列中。
  3. RedisDelayedQueueInit 中的消费者线程不断从队列中获取到期的任务。
  4. 当任务到期时,系统将任务分发给对应的 RedisDelayedQueueListener 实现类(如 CauseDivisionScheduleJob)进行处理。
  5. 任务处理完成后,如果需要重复执行,可以再次将任务添加到延时队列中。

优势

  1. 高可用性:由于使用了 Redis 作为底层存储,即使某个节点失败,任务也不会丢失,可以被其他节点处理。
  2. 精确调度:Redisson 的延时队列支持毫秒级的调度精度。
  3. 动态扩展:可以动态增加或减少工作节点,系统会自动平衡负载。
  4. 一致性:使用 Redis 作为中心化的任务存储,避免了分布式环境下的数据不一致问题。

注意事项

  1. Redis 的可用性:由于系统严重依赖 Redis,需要确保 Redis 的高可用性,可以考虑使用 Redis 集群。
  2. 任务幂等性:在分布式环境中,可能会出现任务重复执行的情况,需要在业务逻辑中保证任务的幂等性。
  3. 监控和告警:建议对队列长度、任务执行时间等指标进行监控,以便及时发现和解决问题。

总结

通过使用 Redisson 的延时队列,我们成功实现了一个可靠的分布式定时任务系统。这个系统不仅满足了高并发和故障转移的需求,还提供了灵活的任务调度能力。在实际的物流线路延迟分析场景中,这个系统表现出色,能够有效地处理大量的实时数据和复杂的分析任务。

对于需要在分布式环境中实现可靠定时任务的项目,Redisson 的延时队列是一个值得考虑的解决方案。它不仅简化了开发过程,还为系统带来了更高的可靠性和可扩展性。

这篇博客文章概述了如何使用 Redisson 延时队列实现分布式定时任务,并结合了您项目中的实际应用场景。如果您需要更详细的信息或者对特定部分有疑问,请随时告诉我。

License:  CC BY 4.0