首页
题库
公司真题
专项练习
面试题库
在线编程
面试
面试经验
AI 模拟面试
简历
求职
学习
基础学习课
实战项目课
求职辅导课
专栏&文章
竞赛
我要招人
发布职位
发布职位、邀约牛人
更多企业解决方案
AI面试、笔试、校招、雇品
HR免费试用AI面试
最新面试提效必备
登录
/
注册
仗剑行山水
淮北师范大学 Java
发布于辽宁
关注
已关注
取消关注
@Java第一人:
深度好文!RocketMQ高级进阶知识精讲!
为了使大家能够清晰明了,有层次的掌握这些知识,我们从生产者、Broker、消费者三个维度来讲解。 生产者 消息发送规则 在RocketMQ中,是基于多个Message Queue来实现类似于kafka的分区效果。如果一个Topic要发送和接收的数据量非常大,需要能支持增加并行处理的机器来提高处理速度,这时候一个Topic可以根据需求设置一个或多个Message Queue。Topic有了多个Message Queue 后,消息可以并行地向各个Message Queue发送,消费者也可以并行地从多个Message Queue读取消息并消费。 那么一个消息会发送到哪个Message Queue上呢,这个就需要我们的路由分发策略了。在Send的众多重载方法中,有这样一个参数 MessageQueueSelector。 RocketMQ中已经帮我们实现了三个实现类: SelectMessageQueueByHash(默认):它是一种不断自增、轮询的方式。 SelectMessageQueueByRandom:随机选择一个队列。 SelectMessageQueueByMachineRoom:返回空,没有实现。 如果上面这几个不能满足我们的需求,还可以自定义MessageQueueSelector,作为参数传进去: SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }}, orderId);复制代码 源码在example/ordermessage/Producer.java 顺序消息 一道很经典的面试题,如何保证消息的有序性?思路是,需要保证顺序的消息要发送到同一个message queue中。其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的。最后,一个消费者内部对一个mq的消费要保证是有序的。我们要做到生产者 - message queue - 消费者之间是一对一对一的关系。 具体操作过程如下: 生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是顺序发送。 写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入。 要达到这个效果很简单,只需要我们在发送的时候传入相同的hashKey,就会选择同一个队列。 3. 消费者消费的时候只能有一个线程,否则由于消费的速率不同,有可能出现记录到数据库的时候无序。 在Spring Boot中,consumeMode设置为ORDERLY,在Java API中,传入MessageListenerOrderly的实现类即可。 consumer.registerMessageListener(new MessageListenerOrderly() {复制代码 当然顺序消费会带来一些问题: 遇到消息失败的消息,无法跳过,当前队列消费暂停 降低了消息处理的性能 事务消息 分布式事务有很多种解决方案,其中一种就是使用RocketMQ的事务消息来达到最终一致性。下面我们来看下RocketMQ是怎么实现的。下面是RocketMQ官网的一张流程图,我们对照着图来分析讲解一下。 rocketmq.apache.org/rocketmq/th… 生产者向RocketMQ服务端发送半消息,什么叫半消息呢,就是暂不能投递消费者的消息,发送方已经将消息成功发送到了MQ服务端,此时消息被标记为暂不能投递状态,需要等待生产者对该消息的二次确认。 MQ服务端给生产者发送ack,告诉生产者半消息已经成功收到了。 发送方开始执行本地数据库事务的逻辑。 执行完成以后将结果告诉MQ服务端,本地事务执行成功就告诉commint,MQ Server收到commit后则将半消息状态置为可投递,consumer最终将收到该消息;本地事务执行失败则发送rollback,MQ Server收到rollback以后则删除半消息,订阅费将不会收到该条消息。 未收到第4步的确认信息时,回查事务状态。消息回查: 因为网络闪断、生产者重启等原因,RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功。 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 发送方根据检查本地事务的最终状态再次提交二次确认,发送commit或者rollback。 上述就是整个事务消息的执行流程,下面我们来看下如何在代码中操作。 RocketMQ中提供了一个TransactionListener接口,我们需要实现它,然后在executeLocalTransaction方法中实现执行本地事务逻辑。 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //local transaction process,return rollback,commit or unknow log.info("executeLocalTransaction:"+JSON.toJSONString(msg)); return LocalTransactionState.UNKNOW; }复制代码 这个方法必须返回一个状态,rollback,commit或者unknow,返回unknow之后,因为不确定到底事务有没有成功,Broker会主动发起对事务执行结果的查询,所以还要再实现一个checkLocalTransaction回查方法。 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { log.info("checkLocalTransaction:"+JSON.toJSONString(msg)); return LocalTransactionState.COMMIT_MESSAGE; }复制代码 默认回查总次数是15次,第一次回查的间隔是6s,后续每次间隔60s。最后在生产者发送的时候指定下事务***即可。 源码在example/transaction/TransactionProducer.java 延迟消息 很多时候,我们村会在这样的业务场景:在一段时间之后,完成一个工作任务的需求,例如:滴滴打车订单完成之后,如果用户一直不评价,48小时会将自动评价为5星;外卖下单30分钟不支付自动取消等等。这种问题的解决方案有很多种,其中一种就是用RocketMQ的延迟队列来实现,但是开源版本功能被***了,只能支持特定等级的消息,商业版可以任意指定时间。 msg.setDelayTimeLevel(2); // 5秒钟复制代码 比如leve=2代表5秒,一共支持18个等级,延迟的级别配置在代码MessageStoreConfig中: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";复制代码 Spring Boot中这样使用 rocketMQTemplate.syncSend(topic,message,1000,2);// 5秒钟复制代码 源码在example/delay/DelayProducer.java Broker 物理存储 我们进入到RocketMQ存储的文件夹看一下,这个目录是我们在安装的时候指定的。 下面依次介绍下这几个文件夹的作用: checkpoint:文件检查点,存储commitlog、consumequeue、indexfile最后一次刷盘时间或时间戳。 commitlog:消息存储目录,一个文件集合,每个默认文件1G大小,当第一个文件写满了,第二个文件会以初始量命名。比如起始偏移量是1073741824,第二个文件名为00000000001073741824,以此类推。 config:运行时的配置信息,包含主题消息过滤信息、集群消费模式消息消费进度、延迟消息队列拉取进度、消息消费组配置信息、topic配置属性等。 consumequeue:消息消费队列存储目录,我们可以看到在consumequeue文件夹下是按topic的名字建文件夹,在每一个topic下面又是按message queue的编号建文件夹,在每个message queue文件夹下就是存放消息在commit log的偏移量以及大小和Tag属性。 5. index:消息索引文件存储目录,在前面使用java api发送消息的时候,我们看到会传入一个keys的参数,它是用来检索消息的。所以如果出现keys,服务端就会创建索引文件,以空格分割的每个关键字都会产生一个索引。单个IndexFile可以保存2000W个索引,文件固定大小约为400M。索引使用的是哈希索引,所以key尽量设置为唯一不重复。 存储理念 我们来看下RocketMQ官网的说明,rocketmq.apache.org/rocketmq/ho… ,我们来导读一下,首先是说kafka为什么不能支持更多的分区,然后说在RocketMQ中我们是如何支持更多分区的。 每个分区存储整个消息数据。虽然每个分区被有序地写入磁盘,但随着并发写入分区数量的增加,从操作系统的角度来看,写入变得随机。 由于数据文件分散,难以使用Linux IO Group Commit机制。 所以RocketMQ干脆另辟蹊径,设计了一种新的文件文件存储方式,就是所有topic的所有消息全部写在同一个文件中,这样就能够保证绝对的顺序写。当然消费的时候就复杂了,要到一个巨大的commitlog中去查找消息,我们不可能遍历所有消息吧,这样效率太慢了。 那怎么办呢?这个就是上面提到的consume queue,它把consume group消费的topic的最后消费到的offset存储在里面。当我们消费的时候,先从consume queue读取持久化消息的起始物理位置偏移量offset、大小size和消息tag的hashcode值,随后再从commitlog中进行读取待拉取消费消息的真正实体内容部分。 consume queue可以理解为消息的索引,它里面没有消息,当然这样的存储理念也不是十全十美,对于commitlog来说,写的时候虽然是顺序写,但是读的时候却变成了完全的随机读;读一条消息先会读consume queue,再读commit log,这样增加了开销。 文件清理策略 跟kalka一样,commit log的内容在消费之后是不会删除,这样做有两个好处,一个是可以被多个consumer group重复消费,只要修改consumer group,就可以从头开始消费,每个consumer group维护自己的offset;另一个是支持消息回溯,随时可以搜索。 但是如果不清理文件的话,文件数量不断地增加,最终会导致磁盘可用空间越来越少,所以RocketMQ会将commitLog、consume queue这些过期文件进行删除,默认是超过72个小时的文件。这里会启动两个线程去跑。 private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); }复制代码 过期文件选出来以后,什么时候去清理呢,有两种情况。一种是通过定时任务,每天凌晨四点去删除这些文件。第二种是磁盘使用空间超过75% 了,这时候已经火烧眉毛了,我还等到你四点干嘛,立即马上就清理了。 如果情况更严重,如果磁盘空间使用率超过85%,会开始批量清理文件,不管有没有过期,直到空间充足;如果磁盘使用率超过90%,会拒绝消息写入。 零拷贝 大家都知道RocketMQ的消息是存储在磁盘上的,但是怎么还能做到这么低的延迟和这么高的吞吐量,其中的一个奥秘就是使用到了零拷贝技术。 首先和大家介绍一下page Cache的概念,这个是操作系统层面的,CPU如果要读取或者操作磁盘上的数据,必须要把磁盘的数据加载到内存中,这个加载的大小有一个固定的单位,叫做Page。x86的linux中一个标准的页大小是4kb。如果要提升磁盘的访问速度,或者说减少磁盘的IO,可以把访问过的Page在内存中缓存起来,这个内存的区域就叫做Page Cache。 下次处理IO请求的时候,先到Page Cache中查找,找到了就直接操作,没找到再到磁盘中去找。当然Page Cache本身也会对数据进行预读,对于每个文件的第一个读请求操作,系统也会将所请求的页的相邻后几个页一起读出来。但是这里还有个问题,我们知道虚拟内存分为内核空间和用户空间,Page Cache属于内核空间,用户空间访问不了,还需要从内核空间拷贝到用户空间缓冲区,这个copy的过程就降低了数据访问的速度。 为了解决这个问题,就产生了零拷贝技术,干脆把Page Cache的数据在用户空间中做一个地址映射,这样用户进行就可以通过指针操作直接读写Page Cache,不再需要系统调用(例如read())和内存拷贝。RocketMQ中具体的实现是使用mmap(memory map,内存映射),而kafka用的是sendfile。 消费者 消费端的负载均衡与rebalance 和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区的消息。消费者挂了,消费者增加,这时候就会用到我们的rebalance。 在RebalanceImpl.class的277行有rebalance的策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e); return; }复制代码 AllocateMessageQueueStrategy有6种实现的策略,也可以自定义实现,在消费者端指定即可。 consumer.setAllocateMessageQueueStrategy();复制代码 AllocateMessageQueueAveragely:平均分配算法(默认) AllocateMessageQueueAveragelyByCircle:环状分配消息队列 AllocateMessageQueueByConfig:按照配置来分配队列,根据用户指定的配置来进行负载 AllocateMessageQueueByMachineRoom:按照指定机房来配置队列 AllocateMachineRoomNearby:按照就近机房来配置队列 AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行 队列的数量尽量要大于消费者的数量。 重试与死信队列 在消费者端如果出现异常,比如数据库不可用、网络出现问题、中途断电等等,这时候返回给Broker的是RECONSUME_LATER,表示稍后重试。这个时候消息会发回到Broker,进入到RocketMQ的重试队列中。服务端会为consumer group创建一个名字为%RETRY%开头的重试队列。 重试队列过一段时间后再次投递到这个ConsumerGroup,如果还是异常,会再次进入到重试队列。重试的时间间隔会不断衰减,从10秒开始直到2个小时:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,最多重试16次。 而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。Broker会创建一个死信队列,死信队列的名字是%DLQ%+ConsumerGroupName,应用可以监控死信队列来做人工干预。一般情况下我们在实际生产中是不需要重试16次,这样既浪费时间又浪费性能,理论上当尝试重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。 源码在jackxu/SimpleConsumer.java MQ选型分析 下面列出市面上常见的三种MQ的分析对比,供大家在项目中实际使用的时候参考对比: 好,RocketMQ系列到这里就结束了,感谢大家的观看~
点赞 0
评论 0
全部评论
推荐
最新
楼层
暂无评论,快来抢首评~
相关推荐
10-09 09:32
海康威视_自动化开发工程师(准入职员工)
海康威视内推,海康威视内推码
分享一下自己对海康的感受,也在海康总部的3期。 之前看了网上的评论实属是有点吓人的,但是百闻不如一见自己终究是亲自感受了一下。 这可能是我国内外大大小小加起来的第6段实习或者工作。 海康首先给我的感觉是人真的好多,尤其食堂的人,我可能上学都没有见过这么多人,还有电梯,我每次坐是一头雾水。当然这些对于我来说都不是很重要。 可能很多人最关心的就是海康的工作强度和时间是不是真如网上说的那么严重,而通过这段时间的感受,我觉得海康可能是我节奏最慢的一次体验,完成了任务就可以开开心心的回家了,根本不需要无效加班,如果自己想学点产品类的知识还是可以在公司里多学一点的。 关于部门小组氛围,我一开始是有点惊讶的...
海康威视公司福利 1032人发布
点赞
评论
收藏
分享
10-09 14:15
卓驭科技_HR(准入职员工)
卓驭(大疆车载)内推
卓驭 嵌入式中间件实习 面经写一写面经,回馈一下社区。⌚️timeline:五月底👋part1:自我介绍 && 项目介绍1. 项目里的内存占用,资源使用的性能评估?性能优化的思考?2. 端侧大模型的选型?3. 机器人比赛中最难的一个问题?技术方案的选择用了多长的时间?4. 之前实习的主要工作?方案是如何确定的?5. 对车载中间件的了解?6. 。。。忘了🤏part2:八股拷打1. 设计模式?平时开发有用到过哪一些设计模式吗?2. 对多态的了解?静态and动态?3. 虚函数里面父类和子类的交互?4. C++容器中vector和list的差异?5. vector的底层实现原理?扩...
点赞
评论
收藏
分享
09-10 16:08
郑州轻工业大学 Java
霸王茶姬Java
霸王茶姬卡学历吗😳😳
团子请爱我一次_十月...:
不是戈门,干哪来了,这就是java嘛
点赞
评论
收藏
分享
08-27 20:12
江西财经大学 测试开发
后悔接字节offer了
第一天入职,后悔没有早点来😍入职就发m4,配4k显示器,送办公大礼包业务基本纯开不测,做的项目也非常有挑战性麻烦包三餐,食堂非常顶,有一种回大学的错觉😭同事也特别好,第一天一起吃中晚饭聊的特别来,ld也比较年轻好沟通。
钝角p:
干几个10点下班就老实了
投递字节跳动等公司10个岗位
点赞
评论
收藏
分享
10-08 19:31
点点互动_产品管理工程师(准入职员工)
点点互动内推
点点互动面经一面(1)传统自我介绍(2)我看你大学里做了项目是吧,我们来聊聊吧(撕数据库、计算机网络)数据库(3)欸,你用的是 MySQL 吧,我想知道如果我要生成用户的唯一id,有什么方法啊?(4)那我如果有一个主键值是 10,然后删除了这一行,插入下一行数据的时候,主键值是多少呀?(5)你知道 MySQL 的索引的索引数据结构吗?(6)那你知道聚簇索引和非聚簇索引吗?计算机网络(7)你项目中前后端用什么协议通信的?「 HTTP 」为什么 说 HTTP 是无状态的呢?(8)那如何防止 Cookie 劫持?(9)那我如果使用 HTTPS 协议, Cookie 就不会被第三方拿到吗?(10)那你...
点赞
评论
收藏
分享
评论
点赞成功,聊一聊 >
点赞
收藏
分享
评论
提到的真题
返回内容
全站热榜
更多
1
...
26国考公告出炉,放宽到38岁意味着什么
5038
2
...
京东官宣发布新车,会有新的HC吗?
4353
3
...
害,找工作哪有不上当的!
4329
4
...
从摆烂到OC,嵌入式人的血泪史
4301
5
...
懂车帝二面 2025.10.11 1h32min
3550
6
...
牛牛求救🆘,不敢梭哈后端第二技能点怎么搭配
3100
7
...
下一站回家
2741
8
...
27届速通第一段前端实习后续--节孝子启动!
2635
9
...
双非秋招大厂time line参考
2536
10
...
找到靠谱的公司,少走些弯路
2367
创作者周榜
更多
正在热议
更多
#
找工作中的小确幸
#
7332次浏览
76人参与
#
秋招踩过的“雷”,希望你别再踩
#
14396次浏览
167人参与
#
深信服秋招来了
#
280445次浏览
2917人参与
#
面包vs爱情,怎么选?
#
15614次浏览
167人参与
#
实习在多还是在精
#
1885次浏览
35人参与
#
发面经攒人品
#
2327216次浏览
32435人参与
#
爱玛科技集团求职进展汇总
#
29550次浏览
208人参与
#
实习下班不想学习,正常吗?
#
2712次浏览
43人参与
#
反问环节如何提问
#
106607次浏览
2001人参与
#
机械求职避坑tips
#
67069次浏览
449人参与
#
校招谈薪一定要知道的事
#
1929次浏览
33人参与
#
你觉得什么岗位会被AI替代
#
3819次浏览
74人参与
#
贝壳求职进展汇总
#
35819次浏览
199人参与
#
机械人值得去的小众企业
#
24090次浏览
54人参与
#
秋招结束之后的日子
#
87948次浏览
985人参与
#
浪潮求职进展汇总
#
17686次浏览
137人参与
#
投格力的你,拿到offer了吗?
#
118899次浏览
686人参与
#
诺瓦星云求职进展汇总
#
219669次浏览
1715人参与
#
新凯来求职进展汇总
#
51777次浏览
131人参与
#
Offer比较,你最看重什么?
#
216303次浏览
1394人参与
#
职场新人体验
#
86641次浏览
611人参与
#
实习教会我的事
#
31613次浏览
274人参与
牛客网
牛客网在线编程
牛客网题解
牛客企业服务