RocketMQ 消费进度持久化

代码纪元 后端 2024-09-20

RocketMQ 消费进度持久化

消费进度文件内容

消息消费完毕,如何保持消费进度呢?带着这个疑问,来看下 RocketMQ 的实现。

RocketMQ 保存消费偏移的文件位置在 ${user.home}/store/config 目录下。

consumerOffset.json

保存正常消费的消费进度,来看下,文件里面的内容。json

代码解读
复制代码
{ "offsetTable":{ "%RETRY%TestConsumer@TestConsumer":{0:0}, "TopicTest@TestConsumer":{0:1,1:2,2:1,3:0} } }

${Topic}@${ConsumerGroup} 格式是正常的消费者。

%Retry%${Topic}@${ConsumerGroup} 格式是重试主题。

{1:2} 则表示 1 号队列已经消费完 2 条消息。

在讲解 ConsumerQueue 时,我提到过它的子条目是定长的,所以在存储消费偏移时,可以存储消费到第几个子条目。

delayOffset.json

保存延迟消息的消费进度,文件的内容如下json

代码解读
复制代码
{ "offsetTable":{1:14,3:10,4:10,5:10,6:10,7:10,8:10,9:0} }

{4:10} 表示:延迟等级为 4 的消费完了 10 个。

讲解完,消费偏移在文件上怎么存储的,我们看下 RocketMQ 在什么时候会消费进度持久化。

消费进度持久化代码实现

正常的普通消息

Broker 每隔 5s 持久化消费偏移。

代码位置:

BrokerController#initialize()java

代码解读
复制代码
public boolean initialize() throws CloneNotSupportedException { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval()/* 默认5000 */, TimeUnit.MILLISECONDS); }

延迟消息

Broker 每隔 10s 就会将 延迟消息消费偏移 持久化。

代码位置:ScheduleMessageService#start()java

代码解读
复制代码
public class ScheduleMessageService extends ConfigManager { public void start() { if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 获取配置的延迟等级 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } // 初始化延迟调度任务 if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 默认每隔 10s,执行一次持久化 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()/*默认 10000 */); } } }

转载来源:https://juejin.cn/post/6993870092572295181

Apipost 私有化火热进行中

评论