首页
题库
公司真题
专项练习
面试题库
在线编程
面试
面试经验
AI 模拟面试
简历
求职
学习
基础学习课
实战项目课
求职辅导课
专栏&文章
竞赛
我要招人
发布职位
发布职位、邀约牛人
更多企业解决方案
AI面试、笔试、校招、雇品
HR免费试用AI面试
最新面试提效必备
登录
/
注册
仗剑行山水
淮北师范大学 Java
发布于辽宁
关注
已关注
取消关注
@Java第一人:
深度好文!RocketMQ高级进阶知识精讲!
为了使大家能够清晰明了,有层次的掌握这些知识,我们从生产者、Broker、消费者三个维度来讲解。 生产者 消息发送规则 在RocketMQ中,是基于多个Message Queue来实现类似于kafka的分区效果。如果一个Topic要发送和接收的数据量非常大,需要能支持增加并行处理的机器来提高处理速度,这时候一个Topic可以根据需求设置一个或多个Message Queue。Topic有了多个Message Queue 后,消息可以并行地向各个Message Queue发送,消费者也可以并行地从多个Message Queue读取消息并消费。 那么一个消息会发送到哪个Message Queue上呢,这个就需要我们的路由分发策略了。在Send的众多重载方法中,有这样一个参数 MessageQueueSelector。 RocketMQ中已经帮我们实现了三个实现类: SelectMessageQueueByHash(默认):它是一种不断自增、轮询的方式。 SelectMessageQueueByRandom:随机选择一个队列。 SelectMessageQueueByMachineRoom:返回空,没有实现。 如果上面这几个不能满足我们的需求,还可以自定义MessageQueueSelector,作为参数传进去: SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }}, orderId);复制代码 源码在example/ordermessage/Producer.java 顺序消息 一道很经典的面试题,如何保证消息的有序性?思路是,需要保证顺序的消息要发送到同一个message queue中。其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的。最后,一个消费者内部对一个mq的消费要保证是有序的。我们要做到生产者 - message queue - 消费者之间是一对一对一的关系。 具体操作过程如下: 生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是顺序发送。 写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入。 要达到这个效果很简单,只需要我们在发送的时候传入相同的hashKey,就会选择同一个队列。 3. 消费者消费的时候只能有一个线程,否则由于消费的速率不同,有可能出现记录到数据库的时候无序。 在Spring Boot中,consumeMode设置为ORDERLY,在Java API中,传入MessageListenerOrderly的实现类即可。 consumer.registerMessageListener(new MessageListenerOrderly() {复制代码 当然顺序消费会带来一些问题: 遇到消息失败的消息,无法跳过,当前队列消费暂停 降低了消息处理的性能 事务消息 分布式事务有很多种解决方案,其中一种就是使用RocketMQ的事务消息来达到最终一致性。下面我们来看下RocketMQ是怎么实现的。下面是RocketMQ官网的一张流程图,我们对照着图来分析讲解一下。 rocketmq.apache.org/rocketmq/th… 生产者向RocketMQ服务端发送半消息,什么叫半消息呢,就是暂不能投递消费者的消息,发送方已经将消息成功发送到了MQ服务端,此时消息被标记为暂不能投递状态,需要等待生产者对该消息的二次确认。 MQ服务端给生产者发送ack,告诉生产者半消息已经成功收到了。 发送方开始执行本地数据库事务的逻辑。 执行完成以后将结果告诉MQ服务端,本地事务执行成功就告诉commint,MQ Server收到commit后则将半消息状态置为可投递,consumer最终将收到该消息;本地事务执行失败则发送rollback,MQ Server收到rollback以后则删除半消息,订阅费将不会收到该条消息。 未收到第4步的确认信息时,回查事务状态。消息回查: 因为网络闪断、生产者重启等原因,RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功。 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 发送方根据检查本地事务的最终状态再次提交二次确认,发送commit或者rollback。 上述就是整个事务消息的执行流程,下面我们来看下如何在代码中操作。 RocketMQ中提供了一个TransactionListener接口,我们需要实现它,然后在executeLocalTransaction方法中实现执行本地事务逻辑。 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //local transaction process,return rollback,commit or unknow log.info("executeLocalTransaction:"+JSON.toJSONString(msg)); return LocalTransactionState.UNKNOW; }复制代码 这个方法必须返回一个状态,rollback,commit或者unknow,返回unknow之后,因为不确定到底事务有没有成功,Broker会主动发起对事务执行结果的查询,所以还要再实现一个checkLocalTransaction回查方法。 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { log.info("checkLocalTransaction:"+JSON.toJSONString(msg)); return LocalTransactionState.COMMIT_MESSAGE; }复制代码 默认回查总次数是15次,第一次回查的间隔是6s,后续每次间隔60s。最后在生产者发送的时候指定下事务***即可。 源码在example/transaction/TransactionProducer.java 延迟消息 很多时候,我们村会在这样的业务场景:在一段时间之后,完成一个工作任务的需求,例如:滴滴打车订单完成之后,如果用户一直不评价,48小时会将自动评价为5星;外卖下单30分钟不支付自动取消等等。这种问题的解决方案有很多种,其中一种就是用RocketMQ的延迟队列来实现,但是开源版本功能被***了,只能支持特定等级的消息,商业版可以任意指定时间。 msg.setDelayTimeLevel(2); // 5秒钟复制代码 比如leve=2代表5秒,一共支持18个等级,延迟的级别配置在代码MessageStoreConfig中: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";复制代码 Spring Boot中这样使用 rocketMQTemplate.syncSend(topic,message,1000,2);// 5秒钟复制代码 源码在example/delay/DelayProducer.java Broker 物理存储 我们进入到RocketMQ存储的文件夹看一下,这个目录是我们在安装的时候指定的。 下面依次介绍下这几个文件夹的作用: checkpoint:文件检查点,存储commitlog、consumequeue、indexfile最后一次刷盘时间或时间戳。 commitlog:消息存储目录,一个文件集合,每个默认文件1G大小,当第一个文件写满了,第二个文件会以初始量命名。比如起始偏移量是1073741824,第二个文件名为00000000001073741824,以此类推。 config:运行时的配置信息,包含主题消息过滤信息、集群消费模式消息消费进度、延迟消息队列拉取进度、消息消费组配置信息、topic配置属性等。 consumequeue:消息消费队列存储目录,我们可以看到在consumequeue文件夹下是按topic的名字建文件夹,在每一个topic下面又是按message queue的编号建文件夹,在每个message queue文件夹下就是存放消息在commit log的偏移量以及大小和Tag属性。 5. index:消息索引文件存储目录,在前面使用java api发送消息的时候,我们看到会传入一个keys的参数,它是用来检索消息的。所以如果出现keys,服务端就会创建索引文件,以空格分割的每个关键字都会产生一个索引。单个IndexFile可以保存2000W个索引,文件固定大小约为400M。索引使用的是哈希索引,所以key尽量设置为唯一不重复。 存储理念 我们来看下RocketMQ官网的说明,rocketmq.apache.org/rocketmq/ho… ,我们来导读一下,首先是说kafka为什么不能支持更多的分区,然后说在RocketMQ中我们是如何支持更多分区的。 每个分区存储整个消息数据。虽然每个分区被有序地写入磁盘,但随着并发写入分区数量的增加,从操作系统的角度来看,写入变得随机。 由于数据文件分散,难以使用Linux IO Group Commit机制。 所以RocketMQ干脆另辟蹊径,设计了一种新的文件文件存储方式,就是所有topic的所有消息全部写在同一个文件中,这样就能够保证绝对的顺序写。当然消费的时候就复杂了,要到一个巨大的commitlog中去查找消息,我们不可能遍历所有消息吧,这样效率太慢了。 那怎么办呢?这个就是上面提到的consume queue,它把consume group消费的topic的最后消费到的offset存储在里面。当我们消费的时候,先从consume queue读取持久化消息的起始物理位置偏移量offset、大小size和消息tag的hashcode值,随后再从commitlog中进行读取待拉取消费消息的真正实体内容部分。 consume queue可以理解为消息的索引,它里面没有消息,当然这样的存储理念也不是十全十美,对于commitlog来说,写的时候虽然是顺序写,但是读的时候却变成了完全的随机读;读一条消息先会读consume queue,再读commit log,这样增加了开销。 文件清理策略 跟kalka一样,commit log的内容在消费之后是不会删除,这样做有两个好处,一个是可以被多个consumer group重复消费,只要修改consumer group,就可以从头开始消费,每个consumer group维护自己的offset;另一个是支持消息回溯,随时可以搜索。 但是如果不清理文件的话,文件数量不断地增加,最终会导致磁盘可用空间越来越少,所以RocketMQ会将commitLog、consume queue这些过期文件进行删除,默认是超过72个小时的文件。这里会启动两个线程去跑。 private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); }复制代码 过期文件选出来以后,什么时候去清理呢,有两种情况。一种是通过定时任务,每天凌晨四点去删除这些文件。第二种是磁盘使用空间超过75% 了,这时候已经火烧眉毛了,我还等到你四点干嘛,立即马上就清理了。 如果情况更严重,如果磁盘空间使用率超过85%,会开始批量清理文件,不管有没有过期,直到空间充足;如果磁盘使用率超过90%,会拒绝消息写入。 零拷贝 大家都知道RocketMQ的消息是存储在磁盘上的,但是怎么还能做到这么低的延迟和这么高的吞吐量,其中的一个奥秘就是使用到了零拷贝技术。 首先和大家介绍一下page Cache的概念,这个是操作系统层面的,CPU如果要读取或者操作磁盘上的数据,必须要把磁盘的数据加载到内存中,这个加载的大小有一个固定的单位,叫做Page。x86的linux中一个标准的页大小是4kb。如果要提升磁盘的访问速度,或者说减少磁盘的IO,可以把访问过的Page在内存中缓存起来,这个内存的区域就叫做Page Cache。 下次处理IO请求的时候,先到Page Cache中查找,找到了就直接操作,没找到再到磁盘中去找。当然Page Cache本身也会对数据进行预读,对于每个文件的第一个读请求操作,系统也会将所请求的页的相邻后几个页一起读出来。但是这里还有个问题,我们知道虚拟内存分为内核空间和用户空间,Page Cache属于内核空间,用户空间访问不了,还需要从内核空间拷贝到用户空间缓冲区,这个copy的过程就降低了数据访问的速度。 为了解决这个问题,就产生了零拷贝技术,干脆把Page Cache的数据在用户空间中做一个地址映射,这样用户进行就可以通过指针操作直接读写Page Cache,不再需要系统调用(例如read())和内存拷贝。RocketMQ中具体的实现是使用mmap(memory map,内存映射),而kafka用的是sendfile。 消费者 消费端的负载均衡与rebalance 和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区的消息。消费者挂了,消费者增加,这时候就会用到我们的rebalance。 在RebalanceImpl.class的277行有rebalance的策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e); return; }复制代码 AllocateMessageQueueStrategy有6种实现的策略,也可以自定义实现,在消费者端指定即可。 consumer.setAllocateMessageQueueStrategy();复制代码 AllocateMessageQueueAveragely:平均分配算法(默认) AllocateMessageQueueAveragelyByCircle:环状分配消息队列 AllocateMessageQueueByConfig:按照配置来分配队列,根据用户指定的配置来进行负载 AllocateMessageQueueByMachineRoom:按照指定机房来配置队列 AllocateMachineRoomNearby:按照就近机房来配置队列 AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行 队列的数量尽量要大于消费者的数量。 重试与死信队列 在消费者端如果出现异常,比如数据库不可用、网络出现问题、中途断电等等,这时候返回给Broker的是RECONSUME_LATER,表示稍后重试。这个时候消息会发回到Broker,进入到RocketMQ的重试队列中。服务端会为consumer group创建一个名字为%RETRY%开头的重试队列。 重试队列过一段时间后再次投递到这个ConsumerGroup,如果还是异常,会再次进入到重试队列。重试的时间间隔会不断衰减,从10秒开始直到2个小时:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,最多重试16次。 而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。Broker会创建一个死信队列,死信队列的名字是%DLQ%+ConsumerGroupName,应用可以监控死信队列来做人工干预。一般情况下我们在实际生产中是不需要重试16次,这样既浪费时间又浪费性能,理论上当尝试重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。 源码在jackxu/SimpleConsumer.java MQ选型分析 下面列出市面上常见的三种MQ的分析对比,供大家在项目中实际使用的时候参考对比: 好,RocketMQ系列到这里就结束了,感谢大家的观看~
点赞 0
评论 0
全部评论
推荐
最新
楼层
暂无评论,快来抢首评~
相关推荐
10-09 23:12
已编辑
美团_金融服务_后端开发(准入职员工)
秋招京东科技后端一面(含后续)
背景信息:美团实习,笔试全ak总结:15min实习+15min八股(结合简历上的Spring项目),无手撕(已经意向,后续面经都会更新)详细:先自我介绍,然后把重点引到暑期实习上,自然而然的问实习---实习---1.给我讲讲你暑期实习着手于的系统,看看你对系统的理解答:略(因人而异)2.讲讲你实习中遇到的最大的技术挑战?答:略(因人而异)---八股---3.那在美团的实习里,你主要用了哪些Java或Spring的技术你觉得有些技术深度的,咱们可以详细聊聊答:我在这里通过一个需求说了Java中的LiteFlow,感兴趣的兄弟可以去查查看;4.那这个LiteFlow的使用,是不是涉及到Java的设...
查看12道真题和解析
点赞
评论
收藏
分享
10-11 17:10
门头沟学院 C++
奇安信-测试工程师-一面
9.15投递->9.20笔试->10.11一面,时长40分钟左右女面试官,给我的第一眼印象是那种高校女教授的感觉,不像是一个企业的技术员工1.没有自我介绍,直接问最近在干什么?学习?实习?学校课程?2.聊了接近20分钟的毕业课题内容(不懂一直问这个干啥)3.读研为什么选这个方向?对应的岗位了解过?为什么工作意向和研究方向不一致?课题是深度学习,工作选择测试,两者之间有关系吗?选了一个不感兴趣的研究方向,然后工作又干测试?(之前也看过面经问这种问题的,不知道是不是因为那些毕业年限比较长的面试官对现在的硕士培养体系不太了解,现在有几个硕士找的工作能和自己研究方向相匹配的啊)4.笔试成绩...
查看10道真题和解析
点赞
评论
收藏
分享
09-14 17:35
武汉科技大学 算法工程师
27届找实习求拷打
投了好几天了,都是已读不回,真的要哭死了,xdm帮看看简历,是不是简历不好呀😭😭😭
点赞
评论
收藏
分享
09-17 19:25
已编辑
太原理工大学 游戏测试
boss偶遇瓦学妹hr,拼尽全力无法战胜
哈哈哈😋
叁六玖:
公司名发我,我要这个HR带我打瓦
我的秋招日记
点赞
评论
收藏
分享
10-06 22:15
Fibocom广和通_助理工程师
10月秋招来得及吗
很多小伙伴从9月开始投递,投递的都石沉大海没有一点消息。但是大家真的不用太焦虑。9月能拿到offer的大多都是92,而他们手上不止1个offer,可能还在挑更好的。所以很快释放。其次就是很多公司都是9月中旬才开启招聘,从hr晒简历,再到面试,一轮二轮三轮的面试整个流程时间也很长。可能还没有轮到你的简历。所以这个阶段,除了一遍投简历,更重要的就是抓紧优化简历,复习,练习。一旦有面试机会一定要抓住。倘若你已经进入一轮面,但是一轮面完短暂时间内又没消息,此时你觉得的面试情况比较不错,那可能是还在池子里泡着。有比你更优秀的人,或者旗鼓相当的。hr还在抉择。可以等一段时间后再联系一下hr问一下是否还有机...
点赞
评论
收藏
分享
评论
点赞成功,聊一聊 >
点赞
收藏
分享
评论
提到的真题
返回内容
全站热榜
更多
1
...
HR面,到底该准备些啥(附核心问题回答思路)
1.7W
2
...
如何委婉地拒绝offer
6198
3
...
分享一个很友好的公司
2184
4
...
恒生电子笔试交流
2116
5
...
小红书一面面经
1825
6
...
绿盟 安全工程师
1433
7
...
【彻底战败】秋招11次一面挂
1428
8
...
抖音电商泡一个月挂了!
1163
9
...
前端 社招 面经分享:前端两年都问些啥 ❓️❓️❓️
1163
10
...
七牛云路演
1144
创作者周榜
更多
正在热议
更多
#
谈薪时HR压价该怎么应对
#
227002次浏览
3236人参与
#
新凯来求职进展汇总
#
47330次浏览
122人参与
#
爱玛科技集团求职进展汇总
#
21399次浏览
173人参与
#
什么专业适合考公
#
41168次浏览
243人参与
#
机械制造岗投递时间线
#
31296次浏览
379人参与
#
招银网络求职进展汇总
#
157560次浏览
975人参与
#
深信服秋招来了
#
278166次浏览
2914人参与
#
毕业租房也有小确幸
#
138172次浏览
4480人参与
#
华为海思工作体验
#
27891次浏览
117人参与
#
百度工作体验
#
266544次浏览
2109人参与
#
互联网公司爆料
#
143221次浏览
702人参与
#
26届秋招投递记录
#
45399次浏览
493人参与
#
许愿池
#
316462次浏览
2896人参与
#
面试被问“你的缺点是什么?”怎么答
#
149638次浏览
1898人参与
#
央国企投递记录
#
120422次浏览
1509人参与
#
校招求职有谈薪空间吗
#
173643次浏览
2195人参与
#
电信求职进展汇总
#
18092次浏览
115人参与
#
实习中的菜狗时刻
#
436876次浏览
3499人参与
#
找工作前vs找工作后的心路变化
#
24802次浏览
213人参与
#
机械校招之路总结
#
101956次浏览
2014人参与
#
深信服提前批进度交流
#
93835次浏览
678人参与
#
格力求职进展汇总
#
188089次浏览
1251人参与
牛客网
牛客网在线编程
牛客网题解
牛客企业服务