RocketMQ源码分析:Consumer消费偏移量
基于rocketmq-4.9.0 版本分析rocketmq
1.什么是消费偏移量offset?
我们先看一幅图

消费偏移量offset就是记录消费者的消费进度的。也是rocketmq保证消息不会重复消费的核心(当然,极端情况下还是可能会导致重复消费)。
consumequeue中一个消息的索引单元就是一个offset值。
在分析rocketmq的消费者是如何利用这个offset完成消息消费的之前,我们先看下broker端是如何管理这些offset值的。
服务端管理offset
这里的服务端就是broker
- broker在初始化(initialize())时会通过消费者offset管理类ConsumerOffsetManager来加载配置文件中的offset值,然后设置到offsetTable属性中。
public class ConsumerOffsetManager extends ConfigManager { //TODO: key = topic@group //TODO: value = Map[key = queueId, value = offset] private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); //TOOD:...other }
offset文件默认路径:$user.home/store/config/consumerOffset.json 文件内容例子:
{ "offsetTable":{ //topic@consumerGroupName "batchTopic@test_cg_batch":{0:0,1:0,2:0,3:1 }, //key是queueid,value是offset值,就是我们今天讨论的主角 "ordered_topic@CG":{0:0,1:15,2:0,3:35 }, "qiuguan_topic@qiuguan_group_1":{0:2533,1:2534,2:2531,3:2531 }, "hacker_topic@fuyuanhui_group_2":{0:64035,1:64034,2:64034,3:64034 }, "qiuguan_topic_2@qiuguan_group":{0:2,1:1,2:7,3:6 }, "qiuguan_topic@qiuguan_group":{0:2533,1:2534,2:2531,3:2531 } } }
- 消费者消费后,会将offset发送到broker,这里会先写入到上面的消费者offset管理类ConsumerOffsetManager的offsetTable中,然后通过定时任务将其刷盘到文件中。属性中。然后通过3将数据持久化到文件中
稍后我们分析消费者时还会看到
- broker在初始化(initialize())时,会启动定时任务,每隔5秒执行一次初始化,将ConsumerOffsetManager的offsetTable属性中的值持久化到文件中。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS);
那么服务端对于offset值的管理大致就这些,那么我们来看下消费者是如何利用offset来进行消息消费的。
总的来说就是,消费者定时的将消费过的offset值上传到broker的内存offsetTable中,然后通过定时任务将其刷盘到文件中。
那么接下来就看看消费者是如何使用这个offset值的。
3.消费者使用offset
3.1 消费者初始化offset
前面在分析consumer消费时我们知道,它会启动一个消息拉取服务PullMessageService对象,还有一个是在拉取消息之前要完成的重平衡RebalanceService对象。offset初始化就和重平衡息息相关,那么我们就看下重平衡是如何完成offset初始化的。
我们这里还是只讨论集群消费模式。它和广播模式的区别就是,广播模式每个消费者都要消费topic下的所有队列,集群模式通过分配算法(默认是平均)来将topic下的所有队列分配给消费者。既然这里我们主要讨论的是offset,那么就以集群模式进行分析即可。
前面我们分析了重平衡,那么这里我们就只看和offset初始化相关的部分
RebalanceService#run()一步一步来到初始化offset的地方
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; //TODO: 省略部分代码........ List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = -1L; try { //TODO:初始化偏移量 nextOffset = this.computePullFromWhereWithException(mq); } catch (MQClientException e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //TODO:构建拉取请求对象PullRequest PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); //TODO:重点:设置初始offset值 pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } this.dispatchPullRequest(pullRequestList); return changed; }
初始化 offset值的地方就是 computePullFromWhereWithException(mq)方法,那么点进去看下它的内部逻辑:
RebalancePushImpl
@Override public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { long result = -1; //TODO:默认是 CONSUME_FROM_LAST_OFFSET final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); //TODO:获取offsetStore类型,集群模式是RemoteBrokerOffsetStore,后面还会在单独说 final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { //TOOD:前3个已经废弃 case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { //TODO:从broker中读取offset值 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); throw e; } } } else { result = -1; } break; } //TODO:从broker中读取offset值 case CONSUME_FROM_FIRST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } //TODO:从broker中读取offset值 case CONSUME_FROM_TIMESTAMP: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); throw e; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); throw e; } } } else { result = -1; } break; } default: break; } return result; }
这里看到RocketMQ一个比较有意思的枚举 ConsumeFromWhere, 我这里就不对他展开描述了,如果大家感兴趣可以自己研究下,总之就是无论哪种配置策略都要先从broker读取offset,如果“没有”,则按照客户端的规则进行offset的初始化。
所以这里就是确定好初始offset. 确定好offset后,就可以构建PullRequest了,我们继续看前面的代码:
long nextOffset = -1L; try { //TODO:初始化offset nextOffset = this.computePullFromWhereWithException(mq); } catch (MQClientException e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); //TODO:设置初始化offset,拉取消息时,就从这个offset开始拉取 pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); }
这里就比较熟悉了,前面分析消费流程的时候我们也看到过,接下来就是将填充好MessageQueue(主要就是queueid), ProcessQueue(消费者本地缓存消息的队列),nextOffset(消费偏移量)的 PullRequest 对象放入拉取消息的类PullMessageService 中的pullRequestQueue队列属性中,这样PullMessageService 就可以去broker拉取消息了。
拉取消息的过程我们还只是关注消费偏移量(nextOffset)的变化
3.3 利用offset拉取消息
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //TODO:从队列中取出 PullRequest PullRequest pullRequest = this.pullRequestQueue.take(); //TODO:拉取消息 this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
那么在拉取消息的过程中,offset是如何工作的呢?我们用一幅图来展示:

那么我们在从源码中简单看下拉取过程:
DefaultMQPushConsumerImpl#pullMessage(PullRequest pullRequest)
public void pullMessage(final PullRequest pullRequest) { //TODO:....省略诸多流控代码....... //TODO:消息从broker拉取成功后的本地回调处理 //TODO:PullResult 包含了从broker读取的消息,以及新的offset PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); //TODO:重点关注 //TODO:将broker返回的新的offset值重新设置给PullRequest的 nextOffset //TODO:下次就用新的offset去broker读取消息 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { //TODO: 重点关注 //TODO:再次将PullRequest放入队列中 //TODO:还是原来的PullRequest对象,只不过offset更新了 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } //TODO:...省略部分代码...... break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; //TODO:...省略部分代码。。。。。 //TODO:去broker拉取消息 try { this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), //TODO:从PullReqeust中获取offset值给broker pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, //TODO:上面构建的回调对象,当从broker拉取到消息后交给回调处理 pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
通过分析源码,我们也大概知道了在拉取消息过程中offset是如何变化的。那么接下来我们在看下offset是如何完成持久化的。
3.4 offset的更新和持久化
在消费者消费完消息后,就需要更新offset并完成持久化,我们我们就看下消费完成后处理消费结果的代码:
ConsumeMessageConcurrentlyService#processConsumeResult(...)
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } //TODO:关注这里即可 //TODO:从本地缓存队列中将消费的消息移除 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { //TODO:更新offset this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
我们继续看下updateOffset的逻辑:

这里有两个实现类
- LocalFileOffsetStore : 广播模式使用它
- RemoteBrokerOffsetStore : 集群模式使用它,我们分析的就是集群模式
那么我们就看下RemoteBrokerOffsetStore的updateOffset的逻辑:
public class RemoteBrokerOffsetStore implements OffsetStore { //TOOD:....省略部分代码....... @Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { offsetOld.set(offset); } } } } }
其实很简单,就是将消费过的消息的offset(递增)更新到消费者本地offset变量表中。然后通过定时任务持久化到broker中。
接下来在简单看下消费者客户端持久化offset到broker. 在消费者启动时,会启动一个定时任务:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
每隔5s将offset持久化到broker
我们在简单看下broker端如何处理offset
- 首先要接收客户端发送的offset
public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { //TODO:.....省略其他code..... @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this.getConsumerListByGroup(ctx, request); //TODO:更新offset case RequestCode.UPDATE_CONSUMER_OFFSET: return this.updateConsumerOffset(ctx, request); case RequestCode.QUERY_CONSUMER_OFFSET: return this.queryConsumerOffset(ctx, request); default: break; } return null; } //TODO:....省略其他代码 }
最终代码:
public class ConsumerOffsetManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; //TODO: key = topic@group //TODO: value = Map[key = queueId, value = offset] private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); //TODO:....省略诸多code..... private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap<Integer, Long>(32); map.put(queueId, offset); this.offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); } } } //TODO:...... }
其实也很简单,就是将客户端发送的offset更新到broker的offset表中,然后再通过broker的定时任务持久化到文件中。那么我们在简单看下broker的持久化:
- broker的持久化
broker启动时,也会启动持久化的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); 复制代码
通过定时任务将上面ConsumerOffsetManager类中的offset表offsetTable写到文件中。至此,就完成了offset的更新和持久化。
4.总结
本文从源码的角度分析了RocketMQ的消费偏移量offset是如何被使用的,简单总结下持久化过程:
- 消费者客户端本地维护一个offset表,消费者消费完成后先更新到本地offset表中
- 消费者客户端启动时会开启一个定时任务,将本地的offset表发送到broker
- broker会接收消费者客户端发送的offset数据,并保存到broker的本地offset表中
- broker启动时也会开启定时任务,用于将broker的本地offset表中的数据持久化到文件中
关于offset的使用,请参考上面的图。
#Java##程序员##计算机#