40.RocketMQ之高频面试题大全

消息中间件如何选型

RabbitMQ

erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。

RocketMQ

java开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。

Kafka

Scala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。 ActiveMQ java开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。

消息中间件的作用

因为项目比较大,做了分布式系统,所有远程服务调用请求都是同步执行经常出问题,所以引入了mq

作用描述
解耦系统耦合度降低,没有强依赖关系
异步不需要同步执行的远程调用可以有效提高响应时间
削峰请求达到峰值后,后端service还可以保持固定消费速率消费,不会被压垮
数据分发通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。原来是需要A系统分别调用B C D 系统。现在如果新增E那么需要修改代码,需要在A系统调用E系统。通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

RocketMQ角色

角色作用
Nameserver路由发现,无状态,动态列表。这也是和zookeeper的重要区别之一。zookeeper是有状态的。
Producer消息生产者,负责发消息到Broker。
Broker就是MQ本身,负责收发消息、消息存储等。
Consumer消息消费者,负责从Broker上拉取消息进行消费,消费完进行ack。

主要是发布订阅模式,主要组件:消息发送者、消息服务器(消息存储)、消息消费、。

NameServer

NameServer 是整个 RocketMQ 的“大脑”,它是 RocketMQ 的服务注册中心,所以 RocketMQ需要先启动 NameServer 再启动 Rocket 中的 Broker

Broker 在启动时向所有 NameServer 注册(主要是服务器地址等),生产者在发送消息之前先从 NameServer 获取 Broker 服务器地址列表(消费者一 样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

NameServer与每台 Broker 服务保持长连接, NameServer每间隔 30S检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。

Broker和NameServer的关系

Broker会向所有的NameServer上注册自己的信息,而不是某一个,是每一个,全部!

RocketMQ中的Topic和queue区别?

queue就是来源于数据结构的FIFO队列。而Topic是个抽象的概念,每个Topic底层对应N个queue,而数据也真实存在queue上的。

消息分类Topic和Tag

消息消费是,消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息,可以支持在服务端与消费端的消息过滤机制。

消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。 TopicTag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。 以天猫交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建 Topic_OrderTopic_Pay,其中订单消息根据商品品类以不同的 Tag 再进行细分,如电器类、男装类、女装类、化妆品类,最后他们都被各个不同的系统所接收。 通过合理的使用 TopicTag,可以让业务结构清晰,更可以提高效率。

您可能会有这样的疑问:到底什么时候该用 Topic,什么时候该用 Tag?

1)消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。

2)业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女 装类订单、化妆品类订单的消息可以用 Tag 进行区分。

3)消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。

4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间 而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic

消息存储

一般 MQ 核心就是消息的存储,对存储一般来说两个维度:消息堆积能力和消息存储性能。RocketMQ 追求消息存储的高性能,引入内存映射机制,所有的主题消息顺序存储在同一个文件中。同时为了防止无限堆积,引入消息文件过期机制和文件存储空间报警机制。

消息高可用

Rocket 关机、断电等情况下,Rokcet 可以确保不丢失消息(同步刷盘机制不丢失,异步刷盘会丢失少量)。

另外如果 Rocket 服务器因为 CPU、内存、主板、磁盘等关键设备损坏导致无法开机,这个属于单点故障,该节点上的消息全部丢失,如果开启了 异步复制机制,Rocket 可以确保只丢失很少量消息。

如果引入双写机制,这样基本上可以满足消息可靠性要求极高的场景(毕竟两台主服务器同时故障的可能性还是非常小)

消息消费低延迟

RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。

确保消息必须被消费一次

消息确认机制(ACK)来确保消息至少被消费一次,一般 ACK 机制只能做到消息只被消费一次,有重复消费的可能。

消息回溯

已经消费完的消息,可以根据业务要求重新消费消息。

消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的 消息堆积能力,RocketMQ 采用磁盘文件存储,所以堆积能力比较强,同时提供文件过期删除机制。

定时消息

定时消息,定时消息是指消息发送到 Rocket Broker 上之后,不被消费者理解消费,要到等待一定的时间才能进行消费,apache 的版本目 前只支持等待指定的时间才能被消费,不支持任意精度的定时消息消费。(一个说法是任意精度的定时消息会带来性能损耗,但是阿里云版本的 RocketMQ 却提供这样的功能,充值收费优先策略?)

消息重试机制

消息重试是指在消息消费时,如果发送异常,那么消息中间件需要支持消息重新投递,RocketMQ 支持消息重试机制。

Message Key

Key 一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很 方便。

RocketMQ 会创建专门的索引文件,用来存储 Key 与消息的映射,由于是 Hash 索引,应尽量使 Key 唯一,避免潜在的哈希冲突。

TagKey 的主要差别是使用场景不同,Tag 用在 Consumer 代码中,用于服务端消息过滤,Key 主要用于通过命令进行查找消息 RocketMQ 并不能保证 message id 唯一,在这种情况下,生产者在 push 消息的时候可以给每条消息设定唯一的 key, 消费者可以通过 message key 保证对消息幂等处理。

消息被消费后会立即删除吗?

不会,RocketMQ每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。

消息会堆积吗?什么时候清理过期消息?

先简单给大家说一下,其实默认broker会启动后台线程,这个后台线程会自动去检查CommitLog、ConsumeQueue文件,因为这些文件都是多个的,比如CommitLog会有多个,ConsumeQueue也会有多个。 然后如果是那种比较旧的超过72小时的文件,就会被删除掉,也就是说,默认来说,broker只会给你把数据保留3天而已,当然你也可以自己通过fileReservedTime来配置这个时间,要保留几天的时间。 这个定时检查过期数据文件的线程代码,在DefaultMessageStore这个类里,他的start0方法中会调用一个addScheduleTask0方法,里面会每隔10s定时调度执行一个后台检查任务,我们看下面的源码片段

image.png

上面就可以看到了,其实他是每隔10s,就会执行一个调度任务

这个调度任务里就会执行DefaultMessageStore,this.dleanFilesPeriodically0方法,其实就是会去周期性的清理掉磁盘上的数据文件,也就是超过72小时的CommitLog、ConsumeQueue文件,接着我们具体看看这里的清理逻辑,他其实里面包含了清理CommitLo和ConsumeQueue的清理逻辑,如下面源码片段。

在清理文件的时候,他会具体判断一下,如果当前时间是预先设置的凌晨4点,就会触发删除文件的逻辑,这个时间是默认的;或者是如果磁盘空间不足了,就是超过了85%的使用率了,立马会触发删除文件逻辑.

image.png

上面两个条件,第一个是说如果磁盘没有满,那么每天就默认一次会删除磁盘文件,默认就是凌晨4点执行,那个时候必然是业务低峰期,因为凌晨4点大部分人都睡觉了,无论什么业务都不会有太高业务量的。

第二个是说,如果磁盘使用率超过85%了,那么此时可以允许继续写入数据,但是此时会立马触发删除文件的逻辑如果磁盘使用率超过90%了,那么此时不允许在磁盘里写入新数悟,立马删除文件。这是因为,一旦磁盘满了,那么你写入磁盘会失败,此时你MO就彻底故障了 所以一且磁盘满了,也会立马删除文件的 在删除文件的时候,无非就是对文件进行遍历,如果一个文件超过72小时都没修改过了,此时就可以删除了,哪怕有的消息你可能还没消费过,但是此时也不会再让你消费了,就直接删除掉。这就是RocketMQ的一整套文件删除的逻辑和和机制

4.6版本默认72小时后会删除不再使用的CommitLog文件

  • 检查这个文件最后访问时间
  • 判断是否大于过期时间
  • 指定时间删除,默认凌晨4点

RocketMQ消费模式有几种?

消费模型由Consumer决定,消费维度为Topic。

  • 集群消费

1.一条消息只会被同Group中的一个Consumer消费

2.多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据

  • 广播消费

消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

消费消息是push还是pull?-重点看

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式

broker端属性 longPollingEnable 标记是否开启长轮询。默认开启

// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
// 看到没,这是一只披着羊皮的狼,名字叫PushConsumerImpl,实际干的确是pull的活。
 
// 拉取消息,结果放到pullCallback里
this.pullAPIWrapper.pullKernelImpl(pullCallback);

实时性和消息堆积如何取舍

PULL方式

由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的拉取频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)

如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

PUSH方式

由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出或者产生消息堆积,导致异常;

代码@1:如果开启了长轮询模式,则每次只挂起 5s,然后就去尝试拉取。

代码@2:如果不开启长轮询模式,则只挂起一次,挂起时间为 shortPollingTimeMills,然后去尝试查找消息。

代码@3:遍历 pullRequestTable,如果拉取任务的待拉取偏移量小于当前队列的最大偏移量时执行拉取,否则如果没有超过最大等待时间则等待,否则返回未拉取到消息,返回给消息拉取客户端

参考链接:blog.csdn.net/prestigedin…

为什么主动拉取不使用事件监听方式?

事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。

而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。

所以采取了pull的方式。消费者端的缓冲区可能会溢出,导致异常;

broker如何处理拉取请求的?

Consumer首次请求Broker

  • Broker中是否有符合条件的消息

  • 有 ->

    • 响应Consumer
    • 等待下次Consumer的请求
  • 没有

    • 挂起consumer的请求,即不断开连接,也不返回数据

    • 使用consumer的offset

      • DefaultMessageStore#ReputMessageService#run方法

        • 每隔1ms检查commitLog中是否有新消息,有的话写入到pullRequestTable
        • 当有新消息的时候返回请求
      • PullRequestHoldService 来Hold连接,每隔5s执行一次检查pullRequestTable有没有消息,有的话立即推送

RocketMQ如何做负载均衡?

通过Topic在多Broker中分布式存储实现。

producer端

发送端指定message queue发送消息到相应的broker,来达到写入时的负载均衡

  • 提升写入吞吐量,当多个producer同时向一个broker写入数据的时候,性能会下降
  • 消息分布在多broker中,为负载消费做准备

默认策略是平均选择:

  • producer维护一个自增index
  • 每次取节点会自增
  • index向所有broker个数取余
  • 自带容错策略

其他实现:

  • SelectMessageQueueByHash

    • hash的是传入的args
  • SelectMessageQueueByRandom

  • SelectMessageQueueByMachineRoom 没有实现

也可以自定义实现MessageQueueSelector接口中的select方法

MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

consumer端

采用的是平均分配算法来进行负载均衡。

其他负载均衡算法

平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) 手动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) 一致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)

consumer和queue不对等的时候会发生什么?

Consumer和queue会优先平均分配,如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况,如果Consumer等于queue的个数,那就是一个Consumer消费一个queue,如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。

全部评论

相关推荐

09-02 22:36
已编辑
北京邮电大学 Java
行业合作平台&nbsp;base&nbsp;北京1.&nbsp;自我介绍2.&nbsp;点评项目超卖问题怎么解决的3.&nbsp;乐观锁什么问题,比如说十个线程同时到,无限自旋次数,最多会相当于多少个请求打过来(10+9+8+……+1&nbsp;=&nbsp;55)4.&nbsp;高并发场景下有什么问题(就答了个线程安全)5.&nbsp;使用悲观锁呢,乐观锁重复请求会很多,悲观锁效率低,那该怎么办(这里没答出来,就说了个存到redis内预扣减)6.&nbsp;存到redis内预扣减的话lua脚本该怎么写7.&nbsp;为什么用lua脚本能保证原子性(这里稀里糊涂的说了单线程,然后面试官就扯到多线程,然后就说多线程具体是怎么多线程的)8.&nbsp;为什么使用redis效率就高,使用java能实现这种效率吗(说实话,这里没太明白要问什么)9.&nbsp;一人一单怎么解决的,使用了分布式锁10.&nbsp;setnx为什么能实现分布式锁11.&nbsp;使用分布式锁有一个问题,业务没执行完超时释放了,那怎么办12.&nbsp;使用redission了是吧,底层是怎么续期的呢(这里回了看门狗机制,然后直接让手撕看门狗)13.&nbsp;写了15min左右看门狗,写的依托,面试官也没说啥,看我前五分钟没动,说可以用伪代码,也不用完全一样(好人面试官)14.&nbsp;项目拷打完直接JVM,内存分区有什么15.&nbsp;元空间中存的是什么东西,运行时常量池里面存了什么,和堆中的字符串常量池有什么区别16.&nbsp;简单完整的说说垃圾回收(这里说JVM会监控内存然后内存不够就启动gc)17.&nbsp;什么情况下会触发gc(答了new一个对象空间不够的时候,还有system.gc()可能会调用)18.&nbsp;既然system.gc()不一定会启动gc,那么还要他干什么(我这直接蒙,不知道在问啥)19.&nbsp;判断无用对象方法(引用计数法、可达性分析)20.&nbsp;可达性分析法中,没有被gc&nbsp;roots标记的对象怎么处理(这里也不知道在说啥,直接答了一个标记的对象放到一块,然后剩下的直接清理)21.&nbsp;rabbitmq在项目中怎么用的22.&nbsp;什么情况下要使用rabbitmq23.&nbsp;又回到了项目,我要取消订单的时候mq该怎么使用(这里说取消订单放到另一个队列中)24.&nbsp;那么这两个队列一个是取消队列,一个是正常的下单队列,那要是取消队列的消息先被消费,然后才是下单队列被消费改哦怎么办(这个mq完全没见过,直接说不知道)25.&nbsp;线程池七大核心参数26.&nbsp;线程池提交任务怎么修改成核心线程-&gt;非核心线程-&gt;阻塞队列,这种方式(这里就答了同步队列,后来查了发现是要自定义阻塞队列)27.&nbsp;mysql建表的时候要注意什么28.&nbsp;mysql创建索引要注意什么29.&nbsp;limit有什么问题(答了深度分页问题)30.&nbsp;有了解过大模型什么的吗面试官人很好,手撕redission看门狗的时候宕机了5min也没push,有的地方答得慢了也会提醒,但是有的问题确实是没太看懂要问什么两个小时之后约9.4二面
查看30道真题和解析
点赞 评论 收藏
分享
08-30 19:17
门头沟学院 Java
###&nbsp;25.08.26&nbsp;浙大网新软件基本没问八股,都是项目拷打1.&nbsp;项目拷打2.&nbsp;布隆过滤器3.&nbsp;分布式AT,&nbsp;XA,&nbsp;TCC&nbsp;模式4.&nbsp;如何部署项目到服务器,常用的命令5.&nbsp;反问环节,主要是做什么业务,还有哪些地方需要加强###&nbsp;25.08.26&nbsp;杭州某小厂(20-99人)1.&nbsp;说说static修饰符,它的使用和其他的有什么区别吗2.&nbsp;volatile作用3.&nbsp;线程池核心参数,当有一个任务放到线程池时,如何处理,非核心线程在没有任务时如何处理4.&nbsp;redis分布式锁的实现原理5.&nbsp;redisson可重入锁是怎么实现的6.&nbsp;redisson实现的分布式锁是非阻塞还是阻塞的7.&nbsp;mysql索引有哪些以及分别有什么作用,为什么要使用B+树,为什么不使用其他数据结构8.&nbsp;mysql事务隔离级别有哪些,分别解决了什么问题9.&nbsp;可重复读和幻读有什么区别10.&nbsp;说一下OOM,&nbsp;哪些情况下会造成OOM。11.&nbsp;说一下内存泄露,举例说明内存泄露的场景,除了ThreadLocal还有哪些情况会造成内存泄露吗12.&nbsp;说一下接口幂等,举例有哪些场景会需要接口幂等13.&nbsp;说一下责任链模式,举例哪些场景会用到###&nbsp;25.08.27&nbsp;杭州小厂(100-499人)(已OC)####&nbsp;一面1.&nbsp;项目拷打(占较大部分时间)2.&nbsp;说说布隆过滤器3.&nbsp;spring自定义starter的步骤4.&nbsp;mysql索引有哪些5.&nbsp;explain&nbsp;命令的type字段是什么,有哪些内容,哪个最快和最慢,&nbsp;index走的是什么索引6.&nbsp;mysql有哪些锁7.&nbsp;mysql隔离级别,以及innoDB默认隔离级别是什么8.&nbsp;JVM的内存结构9.&nbsp;线程池核心参数,当任务来时的处理流程,&nbsp;核心线程与非核心线程有什么区别####&nbsp;二面1.&nbsp;说一下你觉得哪个项目更难一点2.&nbsp;介绍一下项目的这个难点,为什么难,怎样解决的3.&nbsp;对于高并发的这个分布式场景,如何保证缓存和数据库的一致性的同时满足高吞吐量4.&nbsp;数据库的量有多少,是怎么做处理的5.&nbsp;对一些AI工具有了解吗6.&nbsp;反问环节###&nbsp;25.08.28&nbsp;恒生电子&nbsp;(已OC)1.&nbsp;自我介绍2.&nbsp;对前端了解多少?(我简历没写有关前端的)3.&nbsp;Vue的双向绑定4.&nbsp;你现在使用的JDK版本是多少,分别有新特性我回答了8和17,&nbsp;结果17的新特性确实是忘记了5.&nbsp;说说字符流和字节流的底层6.&nbsp;常见集合,有哪些是有序的,TreeSet的底层实现,这些集合都是实现的哪个接口?7.&nbsp;JVM结构8.&nbsp;Stream流主要的操作9.&nbsp;Spring常用的注解有哪些10.&nbsp;使用过哪些数据库,除了mysql还有其他的吗11.&nbsp;还有一些也是常见的八股,记不太清了12.&nbsp;反问环节,主要的业务
面试题刺客退退退
点赞 评论 收藏
分享
08-29 20:59
已编辑
门头沟学院 Java
1.自我介绍2.项目拷打穿插八股:a.你在哪些功能中引入了新的组件?b.ES相比于MySQL好在哪?c.什么场景下用MySQL查找,什么场景下用ES?d.大数据量用ES就一定更好吗?e.你知道在分布式部署下ES可能出现哪些问题吗?f.讲讲RocketMQ在你的项目中的使用逻辑?g.你认为在你的理解中RocketMQ最重要的特性是什么?(顺序性,不丢失,不重复,幂等性,可用性)h.那你知道RocketMQ是怎么实现这些特性的吗?i.Kafka是怎么实现集群高可用,在Leader宕机情况下不会有消息丢失的?j.你平时是怎么学习技术的?k.你这边建立了二级缓存,那请问你是怎么保证二级缓存中数据一致性的?l.那你采用事务可能会带来一些什么问题?m.除了Caffeine还了解哪些本地缓存结构吗?n.知道Caffeine的内部结构吗?3.来讲讲Java中的Error是怎么出现的?会带来什么后果?4.你提到了OOM,请问什么情况下会出现OOM呢?5.那异常呢?Java中有哪些异常类型?6.你认为编译时异常和运行时异常该怎么去处理?什么时候要注意处理这些异常?7.来讲讲锁,对Synchronized了解吗?8.Synchronized是可重入锁吗?是公平锁吗?9.还了解哪些锁呢?10.假如你自己设计了一个并发包,我们可以丢弃Synchronized只用ReentrantLock吗?手撕:实现一个单链表首尾交叉相连,要求必须在原链表上操作例子:1-2-3-4-5-6-7输出:1-7-2-6-3-5-4反问1.改进建议?(对于大二来说挺不错的)[听怕了这句话,逢说必挂]2.几轮?(2-3轮)希望七夕我可以和offer长长久久,牛友也和另一半长长久久🥰
查看23道真题和解析
点赞 评论 收藏
分享
评论
1
6
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务