首页
题库
公司真题
专项练习
面试题库
在线编程
面试
面试经验
AI 模拟面试
简历
求职
学习
基础学习课
实战项目课
求职辅导课
专栏&文章
竞赛
搜索
我要招人
发布职位
发布职位、邀约牛人
更多企业解决方案
AI面试、笔试、校招、雇品
HR免费试用AI面试
最新面试提效必备
登录
/
注册
仗剑行山水
淮北师范大学 Java
发布于辽宁
关注
已关注
取消关注
@Java第一人:
深度好文!RocketMQ高级进阶知识精讲!
为了使大家能够清晰明了,有层次的掌握这些知识,我们从生产者、Broker、消费者三个维度来讲解。 生产者 消息发送规则 在RocketMQ中,是基于多个Message Queue来实现类似于kafka的分区效果。如果一个Topic要发送和接收的数据量非常大,需要能支持增加并行处理的机器来提高处理速度,这时候一个Topic可以根据需求设置一个或多个Message Queue。Topic有了多个Message Queue 后,消息可以并行地向各个Message Queue发送,消费者也可以并行地从多个Message Queue读取消息并消费。 那么一个消息会发送到哪个Message Queue上呢,这个就需要我们的路由分发策略了。在Send的众多重载方法中,有这样一个参数 MessageQueueSelector。 RocketMQ中已经帮我们实现了三个实现类: SelectMessageQueueByHash(默认):它是一种不断自增、轮询的方式。 SelectMessageQueueByRandom:随机选择一个队列。 SelectMessageQueueByMachineRoom:返回空,没有实现。 如果上面这几个不能满足我们的需求,还可以自定义MessageQueueSelector,作为参数传进去: SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }}, orderId);复制代码 源码在example/ordermessage/Producer.java 顺序消息 一道很经典的面试题,如何保证消息的有序性?思路是,需要保证顺序的消息要发送到同一个message queue中。其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的。最后,一个消费者内部对一个mq的消费要保证是有序的。我们要做到生产者 - message queue - 消费者之间是一对一对一的关系。 具体操作过程如下: 生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是顺序发送。 写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入。 要达到这个效果很简单,只需要我们在发送的时候传入相同的hashKey,就会选择同一个队列。 3. 消费者消费的时候只能有一个线程,否则由于消费的速率不同,有可能出现记录到数据库的时候无序。 在Spring Boot中,consumeMode设置为ORDERLY,在Java API中,传入MessageListenerOrderly的实现类即可。 consumer.registerMessageListener(new MessageListenerOrderly() {复制代码 当然顺序消费会带来一些问题: 遇到消息失败的消息,无法跳过,当前队列消费暂停 降低了消息处理的性能 事务消息 分布式事务有很多种解决方案,其中一种就是使用RocketMQ的事务消息来达到最终一致性。下面我们来看下RocketMQ是怎么实现的。下面是RocketMQ官网的一张流程图,我们对照着图来分析讲解一下。 rocketmq.apache.org/rocketmq/th… 生产者向RocketMQ服务端发送半消息,什么叫半消息呢,就是暂不能投递消费者的消息,发送方已经将消息成功发送到了MQ服务端,此时消息被标记为暂不能投递状态,需要等待生产者对该消息的二次确认。 MQ服务端给生产者发送ack,告诉生产者半消息已经成功收到了。 发送方开始执行本地数据库事务的逻辑。 执行完成以后将结果告诉MQ服务端,本地事务执行成功就告诉commint,MQ Server收到commit后则将半消息状态置为可投递,consumer最终将收到该消息;本地事务执行失败则发送rollback,MQ Server收到rollback以后则删除半消息,订阅费将不会收到该条消息。 未收到第4步的确认信息时,回查事务状态。消息回查: 因为网络闪断、生产者重启等原因,RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功。 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 发送方根据检查本地事务的最终状态再次提交二次确认,发送commit或者rollback。 上述就是整个事务消息的执行流程,下面我们来看下如何在代码中操作。 RocketMQ中提供了一个TransactionListener接口,我们需要实现它,然后在executeLocalTransaction方法中实现执行本地事务逻辑。 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //local transaction process,return rollback,commit or unknow log.info("executeLocalTransaction:"+JSON.toJSONString(msg)); return LocalTransactionState.UNKNOW; }复制代码 这个方法必须返回一个状态,rollback,commit或者unknow,返回unknow之后,因为不确定到底事务有没有成功,Broker会主动发起对事务执行结果的查询,所以还要再实现一个checkLocalTransaction回查方法。 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { log.info("checkLocalTransaction:"+JSON.toJSONString(msg)); return LocalTransactionState.COMMIT_MESSAGE; }复制代码 默认回查总次数是15次,第一次回查的间隔是6s,后续每次间隔60s。最后在生产者发送的时候指定下事务***即可。 源码在example/transaction/TransactionProducer.java 延迟消息 很多时候,我们村会在这样的业务场景:在一段时间之后,完成一个工作任务的需求,例如:滴滴打车订单完成之后,如果用户一直不评价,48小时会将自动评价为5星;外卖下单30分钟不支付自动取消等等。这种问题的解决方案有很多种,其中一种就是用RocketMQ的延迟队列来实现,但是开源版本功能被***了,只能支持特定等级的消息,商业版可以任意指定时间。 msg.setDelayTimeLevel(2); // 5秒钟复制代码 比如leve=2代表5秒,一共支持18个等级,延迟的级别配置在代码MessageStoreConfig中: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";复制代码 Spring Boot中这样使用 rocketMQTemplate.syncSend(topic,message,1000,2);// 5秒钟复制代码 源码在example/delay/DelayProducer.java Broker 物理存储 我们进入到RocketMQ存储的文件夹看一下,这个目录是我们在安装的时候指定的。 下面依次介绍下这几个文件夹的作用: checkpoint:文件检查点,存储commitlog、consumequeue、indexfile最后一次刷盘时间或时间戳。 commitlog:消息存储目录,一个文件集合,每个默认文件1G大小,当第一个文件写满了,第二个文件会以初始量命名。比如起始偏移量是1073741824,第二个文件名为00000000001073741824,以此类推。 config:运行时的配置信息,包含主题消息过滤信息、集群消费模式消息消费进度、延迟消息队列拉取进度、消息消费组配置信息、topic配置属性等。 consumequeue:消息消费队列存储目录,我们可以看到在consumequeue文件夹下是按topic的名字建文件夹,在每一个topic下面又是按message queue的编号建文件夹,在每个message queue文件夹下就是存放消息在commit log的偏移量以及大小和Tag属性。 5. index:消息索引文件存储目录,在前面使用java api发送消息的时候,我们看到会传入一个keys的参数,它是用来检索消息的。所以如果出现keys,服务端就会创建索引文件,以空格分割的每个关键字都会产生一个索引。单个IndexFile可以保存2000W个索引,文件固定大小约为400M。索引使用的是哈希索引,所以key尽量设置为唯一不重复。 存储理念 我们来看下RocketMQ官网的说明,rocketmq.apache.org/rocketmq/ho… ,我们来导读一下,首先是说kafka为什么不能支持更多的分区,然后说在RocketMQ中我们是如何支持更多分区的。 每个分区存储整个消息数据。虽然每个分区被有序地写入磁盘,但随着并发写入分区数量的增加,从操作系统的角度来看,写入变得随机。 由于数据文件分散,难以使用Linux IO Group Commit机制。 所以RocketMQ干脆另辟蹊径,设计了一种新的文件文件存储方式,就是所有topic的所有消息全部写在同一个文件中,这样就能够保证绝对的顺序写。当然消费的时候就复杂了,要到一个巨大的commitlog中去查找消息,我们不可能遍历所有消息吧,这样效率太慢了。 那怎么办呢?这个就是上面提到的consume queue,它把consume group消费的topic的最后消费到的offset存储在里面。当我们消费的时候,先从consume queue读取持久化消息的起始物理位置偏移量offset、大小size和消息tag的hashcode值,随后再从commitlog中进行读取待拉取消费消息的真正实体内容部分。 consume queue可以理解为消息的索引,它里面没有消息,当然这样的存储理念也不是十全十美,对于commitlog来说,写的时候虽然是顺序写,但是读的时候却变成了完全的随机读;读一条消息先会读consume queue,再读commit log,这样增加了开销。 文件清理策略 跟kalka一样,commit log的内容在消费之后是不会删除,这样做有两个好处,一个是可以被多个consumer group重复消费,只要修改consumer group,就可以从头开始消费,每个consumer group维护自己的offset;另一个是支持消息回溯,随时可以搜索。 但是如果不清理文件的话,文件数量不断地增加,最终会导致磁盘可用空间越来越少,所以RocketMQ会将commitLog、consume queue这些过期文件进行删除,默认是超过72个小时的文件。这里会启动两个线程去跑。 private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); }复制代码 过期文件选出来以后,什么时候去清理呢,有两种情况。一种是通过定时任务,每天凌晨四点去删除这些文件。第二种是磁盘使用空间超过75% 了,这时候已经火烧眉毛了,我还等到你四点干嘛,立即马上就清理了。 如果情况更严重,如果磁盘空间使用率超过85%,会开始批量清理文件,不管有没有过期,直到空间充足;如果磁盘使用率超过90%,会拒绝消息写入。 零拷贝 大家都知道RocketMQ的消息是存储在磁盘上的,但是怎么还能做到这么低的延迟和这么高的吞吐量,其中的一个奥秘就是使用到了零拷贝技术。 首先和大家介绍一下page Cache的概念,这个是操作系统层面的,CPU如果要读取或者操作磁盘上的数据,必须要把磁盘的数据加载到内存中,这个加载的大小有一个固定的单位,叫做Page。x86的linux中一个标准的页大小是4kb。如果要提升磁盘的访问速度,或者说减少磁盘的IO,可以把访问过的Page在内存中缓存起来,这个内存的区域就叫做Page Cache。 下次处理IO请求的时候,先到Page Cache中查找,找到了就直接操作,没找到再到磁盘中去找。当然Page Cache本身也会对数据进行预读,对于每个文件的第一个读请求操作,系统也会将所请求的页的相邻后几个页一起读出来。但是这里还有个问题,我们知道虚拟内存分为内核空间和用户空间,Page Cache属于内核空间,用户空间访问不了,还需要从内核空间拷贝到用户空间缓冲区,这个copy的过程就降低了数据访问的速度。 为了解决这个问题,就产生了零拷贝技术,干脆把Page Cache的数据在用户空间中做一个地址映射,这样用户进行就可以通过指针操作直接读写Page Cache,不再需要系统调用(例如read())和内存拷贝。RocketMQ中具体的实现是使用mmap(memory map,内存映射),而kafka用的是sendfile。 消费者 消费端的负载均衡与rebalance 和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区的消息。消费者挂了,消费者增加,这时候就会用到我们的rebalance。 在RebalanceImpl.class的277行有rebalance的策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e); return; }复制代码 AllocateMessageQueueStrategy有6种实现的策略,也可以自定义实现,在消费者端指定即可。 consumer.setAllocateMessageQueueStrategy();复制代码 AllocateMessageQueueAveragely:平均分配算法(默认) AllocateMessageQueueAveragelyByCircle:环状分配消息队列 AllocateMessageQueueByConfig:按照配置来分配队列,根据用户指定的配置来进行负载 AllocateMessageQueueByMachineRoom:按照指定机房来配置队列 AllocateMachineRoomNearby:按照就近机房来配置队列 AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行 队列的数量尽量要大于消费者的数量。 重试与死信队列 在消费者端如果出现异常,比如数据库不可用、网络出现问题、中途断电等等,这时候返回给Broker的是RECONSUME_LATER,表示稍后重试。这个时候消息会发回到Broker,进入到RocketMQ的重试队列中。服务端会为consumer group创建一个名字为%RETRY%开头的重试队列。 重试队列过一段时间后再次投递到这个ConsumerGroup,如果还是异常,会再次进入到重试队列。重试的时间间隔会不断衰减,从10秒开始直到2个小时:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,最多重试16次。 而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。Broker会创建一个死信队列,死信队列的名字是%DLQ%+ConsumerGroupName,应用可以监控死信队列来做人工干预。一般情况下我们在实际生产中是不需要重试16次,这样既浪费时间又浪费性能,理论上当尝试重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。 源码在jackxu/SimpleConsumer.java MQ选型分析 下面列出市面上常见的三种MQ的分析对比,供大家在项目中实际使用的时候参考对比: 好,RocketMQ系列到这里就结束了,感谢大家的观看~
点赞 0
评论 0
全部评论
推荐
最新
楼层
暂无评论,快来抢首评~
相关推荐
01-25 10:54
门头沟学院 大数据开发工程师
数据开发在AI时代的转变,数仓、RAG(检索增强)与 Agent如何支持企业级AI
传统数仓、数据库和分析系统,为业务提供了稳定的数据支持和决策依据。但当数据开始面对 LLM 时,就产生了一个全新的挑战:如何将企业数据转化为模型可感知、可检索、可执行的“上下文”。在企业 AI 系统中,这一过程通常经历三个关键环节:1. 数仓与数据库:提供干净、统一、结构化的业务数据。2. 搜索与 RAG:让模型能够快速、稳定地访问相关信息。3. 工作流与 Agent:将数据能力编排成可执行的任务,实现复杂业务的闭环。图中展示了这一链路的概览。通过这条链路,可以发现数据不再只是“给人看的”,而是成为 AI 执行的核心驱动力。一、从数仓到 LLM:数据“服务对象”的变化在之前的文章中(微信公众号...
AI时代的工作 VS 传...
点赞
评论
收藏
分享
01-23 16:53
长春吉大附中实验学校 产品经理
做入职体检时发现“肺部阴影”怎么办?会影响入职吗?
刚做完入职体检,就收到 “胸透提示肺部阴影” 的消息——这是不少职场人最近的糟心经历。一看到“阴影”俩字,立马联想到“被公司拒绝”、甚至“癌症”等情况,越想越慌。其实肺部阴影不一定影响入职,更≠严重疾病,更今天就拆解 3 种常见情况,教你怎么应对,避免白白焦虑!情况一:肺部炎症为什么会有阴影?入秋天气干燥,很多人感冒、咳嗽没彻底好,肺部出现轻微炎症(比如支气管炎、肺炎),胸透时就会显示“片状阴影”。这种情况在年轻人里特别常见,尤其是刚换城市、熬夜赶面试的求职者。HR 会拒绝吗?大概率不会!只要拿着胸透报告去呼吸内科就诊,医生开“抗感染治疗方案”(比如吃1~2周消炎药),等炎症消退后复查,阴影基...
点赞
评论
收藏
分享
2025-12-04 14:39
门头沟学院 临床研究
双非本科结束秋招~
成为🐧孝子被腾讯录取的那一天,一切都将作废,之前的所有面试都作废,之前的所有offer都作废,之前的学历作废,星星✨作废,月亮🌙作废,银河系🌌作废。上岸腾讯的那一天,中国最好的互联网企业加持,就如同一场大雪覆盖了自己前半生的荒唐。😡😡😡😡😡一是考虑到base地,二是团队氛围好,三是诚意很足秋招其他offer:美团,百度,字节,虾皮,阿里钉钉除了美团和钉钉外,其他给的都还可以,其他挂的就不说了,加油牛油们!
Ku1s:
查看图片
点赞
评论
收藏
分享
01-27 16:44
卓驭科技_HR(准入职员工)
卓驭(大疆车载)内推
自我介绍随后对项目经历的真实性进行了核实,包括项目背景、团队构成及个人分工;面试官询问是否亲自编写过IIC和SPI的底层驱动代码(回答为是);对比IIC与SPI通信的区别(主要涉及速率方面IIC较低而SPI较高、IIC为半双工而SPI为全双工、通信结构上IIC支持多主多从而SPI一般为单主多从点对点模式);介绍BootLoader进行固件升级的整体流程;解释static关键字的用途(可用于修饰变量和函数);阐述函数指针的概念及其实际应用场景(例如在BootLoader中实现向应用程序的跳转);说明结构体变量的几种初始化方式(包括定义后逐成员赋值、定义时按声明顺序初始化、以及定义时通过指定成员名...
点赞
评论
收藏
分享
评论
点赞成功,聊一聊 >
点赞
收藏
分享
评论
提到的真题
返回内容
全站热榜
更多
1
...
一个普通人在985学计算机的四年
5149
2
...
挣钱虽不多,但也弥补了校园时期的遗憾
3228
3
...
mentor视角下的优秀实习生
2918
4
...
J人永远闲不下来于是去提前实习
2684
5
...
大厂提前实习对AI开发的新感悟
2383
6
...
互联网大厂的AI相关岗位全面介绍
2348
7
...
面试官视角聊聊,怎么准备AI大模型产品面试?
2184
8
...
我用Notion+AI整理面经,2周从迷茫到拿3个offer
2079
9
...
浅浅分享一下近况
2072
10
...
拥抱AI,程序员的最后出路
2021
创作者周榜
更多
正在热议
更多
#
今年春招是金一银二嘛?
#
11262次浏览
138人参与
#
抛开难度不谈,你最想去哪家公司?
#
6264次浏览
137人参与
#
赚钱的意义在这一刻具象化
#
5437次浏览
133人参与
#
AI求职实录
#
5753次浏览
165人参与
#
没关系,至少我的__很曼妙
#
4749次浏览
88人参与
#
为什么有人零实习也能进大厂?
#
6806次浏览
149人参与
#
AI时代的工作 VS 传统时代的工作,有哪些不同?
#
9804次浏览
229人参与
#
1月小结:你过的开心吗?
#
2345次浏览
55人参与
#
你的第一家实习公司是什么档次?
#
5429次浏览
82人参与
#
当你问AI“你会取代我的工作吗”,它说_?
#
4724次浏览
158人参与
#
你的landing期是如何度过的?
#
10103次浏览
195人参与
#
除了Java,最推荐学什么技术?
#
7581次浏览
173人参与
#
牛客吐槽大会
#
4426次浏览
91人参与
#
机械人你知道哪些单休企业
#
83287次浏览
415人参与
#
你觉得什么岗位会被AI替代
#
37112次浏览
256人参与
#
秋招结束之后的日子
#
117178次浏览
1062人参与
#
机械人春招想让哪家公司来捞你?
#
379521次浏览
3141人参与
#
你在职场上见过哪些“水货”同事
#
30902次浏览
168人参与
#
哪些瞬间让你真切感受到了工作的乐趣
#
23326次浏览
101人参与
#
实习想申请秋招offer,能不能argue薪资
#
215555次浏览
1163人参与
牛客网
牛客网在线编程
牛客网题解
牛客企业服务