如何测试中间件RocketMQ
测试 RocketMQ 主要涉及性能测试、可靠性测试和容灾测试,需验证其在高并发、高吞吐量场景下的消息处理能力,以及故障恢复能力。以下是详细的测试方案和步骤:
一、测试目标
- 性能测试:吞吐量(TPS):每秒生产/消费的消息量。延迟:消息从生产到消费的时间。稳定性:长时间运行是否出现性能下降或内存泄漏。
- 可靠性测试:消息不丢失:确保消息在 Broker 宕机、网络抖动等场景下不丢失。顺序性:顺序消息的严格顺序消费。
- 容灾测试:主从切换:主节点故障后,从节点是否能快速接管。数据恢复:磁盘损坏后,消息是否能从副本恢复。
二、测试工具
1. RocketMQ 自带工具
- benchmark 工具:producer.sh / consumer.sh:内置性能测试脚本。命令示例:
# 启动生产者压测(发送 100 万条消息,16 线程) sh tools.sh org.apache.rocketmq.example.benchmark.Producer \ -t TestTopic -n nameserver_ip:9876 -w 16 -s 1024 -c 1000000
- admin 命令:通过 mqadmin 查看集群状态、监控指标。
2. JMeter + RocketMQ 插件
- 安装插件:使用 rocketmq-jmeter 插件,支持模拟生产者和消费者。
- 配置示例:添加 RocketMQ Producer Sampler,配置 NameServer、Topic、消息体。
3. 自定义测试脚本
- Java/Python SDK:编写代码模拟生产者和消费者,记录消息轨迹和延迟。
三、性能测试方案
1. 生产者性能测试
- 目标:验证单节点/集群的最大消息发送能力。
- 配置:异步发送模式(DefaultMQProducer)。调整批量发送大小(sendMsgTimeout、compressed)。
- 监控指标:TPS:生产者每秒发送消息数。平均延迟:消息从生产到 Broker 确认的时间。Broker CPU/内存:观察资源是否成为瓶颈。
2. 消费者性能测试
- 目标:验证消息拉取和处理的吞吐量。
- 配置:Push 或 Pull 模式(DefaultMQPushConsumer)。调整并发线程数(consumeThreadMin、consumeThreadMax)。
- 监控指标:消费 TPS:消费者每秒处理的消息数。消费延迟:消息从生产到消费的时间差。积压消息数:检查是否有消息堆积。
3. 综合场景测试
- 混合读写:同时运行生产者和消费者,模拟真实场景。
- 顺序消息测试:验证顺序消息的生产和消费严格有序。
四、可靠性测试方案
1. 消息持久化测试
- 场景:同步刷盘(flushDiskType=SYNC_FLUSH) vs 异步刷盘(ASYNC_FLUSH)。Broker 宕机后重启,检查未消费消息是否恢复。
- 验证方法:生产者发送消息后,强制终止 Broker 进程,重启后查看消息是否完整。
2. 主从切换测试
- 场景:主节点(Master)宕机,观察从节点(Slave)是否自动切换为主。切换期间消息是否正常生产和消费。
- 验证方法:使用 kill -9 模拟主节点宕机,通过 mqadmin clusterList 检查集群状态。
3. 网络分区测试
- 场景:模拟 NameServer 与 Broker 网络中断。生产者/消费者与 Broker 网络中断。
- 验证方法:使用 iptables 阻断网络,观察客户端重试和故障转移机制。
五、容灾测试方案
1. Broker 节点故障
- 步骤:生产者持续发送消息。随机终止一个 Broker 节点。观察生产者是否自动切换到其他 Broker。恢复 Broker,检查消息同步情况。
2. 磁盘损坏测试
- 步骤:删除 Broker 存储路径下的消息文件(如 rm -rf store/commitlog/*)。重启 Broker,观察是否从副本(Slave)恢复数据。
3. NameServer 高可用测试
- 场景:停止部分 NameServer 节点,验证客户端是否能通过剩余 NameServer 发现路由信息。
六、测试结果分析
1. 性能瓶颈定位
- Broker 瓶颈:CPU 打满:升级 CPU 或优化消息处理逻辑。磁盘 IO 高:更换 SSD 或调整刷盘策略(异步刷盘)。
- 网络瓶颈:带宽不足:压缩消息体(compressed=true)或扩容网络。
- 客户端瓶颈:线程数不足:增加生产者/消费者线程数。
2. 优化建议
- Broker 配置优化:properties
# 调整发送/消费线程池大小 sendMessageThreadPoolNums=64 pullMessageThreadPoolNums=64 # 启用消息压缩 compressMsgBodyOverHowmuch=4096
- JVM 调优:
# 增加堆内存,启用 G1 垃圾回收 JAVA_OPT="${JAVA_OPT} -Xms8g -Xmx8g -XX:+UseG1GC"
- 客户端优化:批量发送消息(sendBatch 方法)。消费者提高并行度(consumeMessageBatchMaxSize)。
七、示例测试报告
# RocketMQ 测试报告 ## 1. 测试环境 - **Broker 集群**:3 主 3 从(8C16G,SSD 磁盘)。 - **NameServer**:3 节点(4C8G)。 - **压测工具**:RocketMQ Benchmark 脚本,JMeter。 ## 2. 性能测试结果 | **场景** | **TPS(生产/消费)** | **平均延迟(ms)** | **消息堆积量** | |----------------|---------------------|--------------------|----------------| | 单生产者 | 50,000 / - | 15 | - | | 单消费者 | - / 45,000 | 20 | 0 | | 主从切换 | 48,000 / 43,000 | 200(切换期间) | 1,200 | ## 3. 问题与优化 - **问题**:主从切换期间消息堆积 1,200 条。 - **优化**:调整消费者线程数从 32 增加到 64,堆积量降为 0。
八、注意事项
- 数据隔离:测试 Topic 与生产环境隔离,避免干扰。
- 日志监控:开启 Broker 日志(broker.log)和 GC 日志。
- 逐步加压:从低并发开始,逐步增加压力,避免瞬时高负载导致宕机。
通过以上方案,可全面验证 RocketMQ 的性能和可靠性,确保其满足生产环境需求。
开启新对话
进阶高级测试工程师 文章被收录于专栏
《高级软件测试工程师》专栏旨在为测试领域的从业者提供深入的知识和实践指导,帮助大家从基础的测试技能迈向高级测试专家的行列。 在本专栏中,主要涵盖的内容: 1. 如何设计和实施高效的测试策略; 2. 掌握自动化测试、性能测试和安全测试的核心技术; 3. 深入理解测试驱动开发(TDD)和行为驱动开发(BDD)的实践方法; 4. 测试团队的管理和协作能力。 ——For.Heart