Redisson 延时队列实现分布式定时任务
在现代分布式系统中,定时任务的实现往往需要考虑高可用、可扩展性和一致性等问题。本文将介绍如何使用 Redisson 的延时队列来实现一个分布式定时任务系统,以解决传统定时任务在分布式环境下面临的挑战。
背景
在我们的物流系统中,需要定期分析线路延迟情况并划分延迟原因。这个过程涉及到实时交通数据的获取、车辆轨迹的分析以及大量的计算工作。传统的单机定时任务无法满足高并发和故障转移的需求,因此我们选择使用 Redisson 的延时队列来实现一个分布式的定时任务系统。
Redisson 延时队列简介
Redisson 是一个基于 Redis 的分布式服务框架,它提供了许多分布式原语和工具,其中包括延时队列(Delayed Queue)。Redisson 的延时队列具有以下特点:
- 分布式:可以在多个节点间共享和处理任务。
- 可靠性:基于 Redis 的持久化机制,保证任务不会丢失。
- 精确的调度:可以精确到毫秒级的任务调度。
- 动态伸缩:可以动态增加或减少工作节点。
实现架构
我们的分布式定时任务系统主要包含以下几个组件:
- RedisDelayQueueProducer:负责向延时队列中添加任务。
- RedisDelayedQueueInit:初始化队列监听器,负责启动消费者线程。
- RedisDelayedQueueListener:定义了任务处理的接口。
- Job:实现了 RedisDelayedQueueListener 接口,包含具体的业务逻辑。
核心代码解析
1. 延时队列生产者
这个类负责将任务添加到延时队列中。它使用 Redisson 的 RDelayedQueue 来实现延时功能。
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisDelayQueueProducer {
private static final StringCodec stringCodec = new StringCodec(StandardCharsets.UTF_8);
private final RedissonClient redissonClient;
/**
* 添加延迟队列
*/
public <T> boolean addDelayQueue(@NonNull T t, long delay, @NonNull TimeUnit timeUnit, @NonNull String queueName) {
if (StringUtils.isBlank(queueName)) {
return false;
}
try {
// 获取阻塞队列
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName, stringCodec);
// 获取延时队列
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
// 转换为 JSON 字符串并添加到队列
String value = JSON.toJSONString(t);
delayedQueue.offer(value, delay, timeUnit);
} catch (Exception e) {
log.error("【添加延时队列失败】, 队列名:{}, {}", queueName, e.getMessage());
throw new RuntimeException("(添加延时队列失败)");}
return true;
}
/**
* 获取一个延时阻塞队列
*/
public RBlockingQueue<String> getDelayedBlockingQueue(String queueName, long delay, TimeUnit timeUnit){RBlockingQueue<String> blockingFairQueue = redissonClient.getBlockingQueue(queueName, stringCodec);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer("", delay, timeUnit);
return blockingFairQueue;
}
}
2. 队列监听器接口
这个接口定义了任务处理的方法和获取队列枚举的方法。
public interface RedisDelayedQueueListener {
/**
* 处理消息
*/
void invoke(String msg) throws Exception;
/**
* 获取队列枚举
*/
RedisDelayQueueEnum getRedisDelayQueueEnum();}
3. 队列初始化器
这个类负责初始化和启动所有的队列监听器。它在应用启动时自动运行,为每个 RedisDelayedQueueListener 创建一个独立的线程来监听队列。
@RequiredArgsConstructor
@Slf4j
@Component
@Order(10)
public class RedisDelayedQueueInit implements ApplicationContextAware, CommandLineRunner {
// 存储任务 Future 的 Map
private final Map<RedisDelayQueueEnum, Future<Void>> futureTaskMap = new HashMap<>();
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private final RedisDelayQueueProducer redisDelayQueueProducer;
// 存储所有监听器的 Map
private Map<String, RedisDelayedQueueListener> taskListeners;
/**
* 获取应用上下文并获取相应的接口实现类
*/
@Override
public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
this.taskListeners = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
}
/**
* 启动监听器
*/
public void start(String taskFlag) {
if ("1".equals(taskFlag) && !taskListeners.isEmpty()){taskListeners.values().forEach(listener -> {Future<Void> future = startJob(listener);
futureTaskMap.put(listener.getRedisDelayQueueEnum(), future);});
}
}
/**
* 启动单个监听任务
*/
private Future<Void> startJob(RedisDelayedQueueListener redisDelayedQueueListener) {
String queueName = redisDelayedQueueListener.getRedisDelayQueueEnum().getQueueName();
RBlockingQueue<String> blockingFairQueue = redisDelayQueueProducer.getDelayedBlockingQueue(
queueName, 3, TimeUnit.SECONDS);
return ThreadUtil.execAsync(() -> {
log.info("【启动消息队列线程】, 消息队列名:{}", queueName);
while (true) {
try {
// 获取队列消息
String value = blockingFairQueue.take();
log.info("消息队列名:{}, 获取到消息:{}", queueName, value);
if(StringUtils.isBlank(value)) continue;
// 异步处理消息
CompletableFuture.runAsync(() -> {
try {redisDelayedQueueListener.invoke(value);
} catch (Exception e) {
log.error("【消息消费失败】 消息队列名:{}, 获取到消息:{}", queueName, value, e);}
}, threadPoolTaskExecutor);} catch (InterruptedException ie) {
log.error("监听队列中断 queueName={}", queueName);
throw ie;
} catch (Exception e) {
log.error("{} 监听队列线程错误,", queueName, e);
ThreadUtil.sleep(3000);}
}
});}
@Override
public void run(String... args) {
start("1");}
}
4. 任务监听器实现示例
这个类实现了具体的业务逻辑,包括处理任务和重新调度任务。
@Component
@Slf4j
@RequiredArgsConstructor
public class DelayedTaskListener implements RedisDelayedQueueListener {
@Override
public void invoke(String msg) throws Exception {
// 1. 解析任务信息
TaskInfo taskInfo = JSON.parseObject(msg, TaskInfo.class);
try {
// 2. 执行任务逻辑
processTask(taskInfo);
// 3. 如果需要重复执行,重新加入队列
if (taskInfo.isRepeat()) {scheduleNextTask(taskInfo);
}
} catch (Exception e) {
log.error("Task execution failed: {}", e.getMessage(), e);
}
}
@Override
public RedisDelayQueueEnum getRedisDelayQueueEnum() {
return RedisDelayQueueEnum.TASK_QUEUE;
}
}
5. 队列枚举定义
public enum RedisDelayQueueEnum {
TASK_QUEUE("DELAYED_TASK_QUEUE");
private final String queueName;
RedisDelayQueueEnum(String queueName) {
this.queueName = queueName;
}
public String getQueueName() {
return queueName;
}
}
工作流程
- 系统启动时,RedisDelayedQueueInit 初始化所有的队列监听器。
- 当需要创建一个新的定时任务时,调用 RedisDelayQueueProducer.addDelayQueue() 方法将任务添加到延时队列中。
- RedisDelayedQueueInit 中的消费者线程不断从队列中获取到期的任务。
- 当任务到期时,系统将任务分发给对应的 RedisDelayedQueueListener 实现类(如 CauseDivisionScheduleJob)进行处理。
- 任务处理完成后,如果需要重复执行,可以再次将任务添加到延时队列中。
优势
- 高可用性:由于使用了 Redis 作为底层存储,即使某个节点失败,任务也不会丢失,可以被其他节点处理。
- 精确调度:Redisson 的延时队列支持毫秒级的调度精度。
- 动态扩展:可以动态增加或减少工作节点,系统会自动平衡负载。
- 一致性:使用 Redis 作为中心化的任务存储,避免了分布式环境下的数据不一致问题。
注意事项
- Redis 的可用性:由于系统严重依赖 Redis,需要确保 Redis 的高可用性,可以考虑使用 Redis 集群。
- 任务幂等性:在分布式环境中,可能会出现任务重复执行的情况,需要在业务逻辑中保证任务的幂等性。
- 监控和告警:建议对队列长度、任务执行时间等指标进行监控,以便及时发现和解决问题。
总结
通过使用 Redisson 的延时队列,我们成功实现了一个可靠的分布式定时任务系统。这个系统不仅满足了高并发和故障转移的需求,还提供了灵活的任务调度能力。在实际的物流线路延迟分析场景中,这个系统表现出色,能够有效地处理大量的实时数据和复杂的分析任务。
对于需要在分布式环境中实现可靠定时任务的项目,Redisson 的延时队列是一个值得考虑的解决方案。它不仅简化了开发过程,还为系统带来了更高的可靠性和可扩展性。
这篇博客文章概述了如何使用 Redisson 延时队列实现分布式定时任务,并结合了您项目中的实际应用场景。如果您需要更详细的信息或者对特定部分有疑问,请随时告诉我。