9.1 消息队列选型对比
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式: "你们项目中用的什么消息队列?为什么选择它?"
技术深度: 架构选型、性能对比、可靠性保证
预计阅读时间:30分钟
🎯 消息队列核心概念
什么是消息队列
消息队列(Message Queue,MQ)是一种应用程序间的通信方法,通过在消息的传输过程中保存消息的容器来实现。
核心作用:
- 解耦:降低系统间的耦合度
- 异步:提升系统响应速度
- 削峰:应对流量高峰
- 可靠性:保证消息不丢失
消息队列基本模型
/** * 点对点模式(Queue) */ @RabbitListener(queues = "order.queue") public void processOrder(OrderMessage message) { log.info("Processing order: {}", message.getOrderId()); // 只有一个消费者能收到这条消息 } /** * 发布订阅模式(Topic) */ @RabbitListener(queues = "user.register.email") public void sendEmail(UserRegisterEvent event) { log.info("Sending welcome email to: {}", event.getEmail()); } @RabbitListener(queues = "user.register.sms") public void sendSMS(UserRegisterEvent event) { log.info("Sending welcome SMS to: {}", event.getPhone()); }
🔄 主流消息队列对比
RabbitMQ详解
特点:
- 基于AMQP协议,Erlang开发
- 功能丰富,管理界面友好
- 支持多种消息模式
核心配置:
@Configuration public class RabbitMQConfig { // Direct Exchange - 精确匹配 @Bean public DirectExchange orderDirectExchange() { return new DirectExchange("order.direct"); } // Topic Exchange - 模式匹配 @Bean public TopicExchange userTopicExchange() { return new TopicExchange("user.topic"); } // 队列声明 @Bean public Queue orderQueue() { return QueueBuilder.durable("order.process") .withArgument("x-message-ttl", 60000) .withArgument("x-max-length", 10000) .build(); } } @Service public class RabbitMQProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrderMessage(OrderMessage message) { MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setExpiration("60000"); Message rabbitMessage = new Message( JSON.toJSONBytes(message), properties); rabbitTemplate.send("order.direct", "order.create", rabbitMessage); } }
可靠性保证:
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); // 生产者确认 template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("Message sent successfully: {}", correlationData); } else { log.error("Message send failed: {}", cause); } }); // 消息返回确认 template.setReturnsCallback(returned -> { log.error("Message returned: {}", returned.getMessage()); }); return template; } @RabbitListener(queues = "order.queue") public void processOrder(OrderMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { orderService.processOrder(message); channel.basicAck(deliveryTag, false); // 手动确认 } catch (Exception e) { channel.basicNack(deliveryTag, false, true); // 重新入队 } }
Kafka详解
特点:
- 高吞吐量、低延迟
- 分布式、可扩展
- 适合大数据场景
生产者配置:
@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); return new DefaultKafkaProducerFactory<>(props); } } @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void sendOrderEvent(OrderEvent event) { kafkaTemplate.send("order-events", event.getOrderId(), event) .addCallback( result -> log.info("Order event sent: {}", event), failure -> log.error("Send failed: {}", event, failure) ); } }
消费者配置:
@KafkaListener(topics = "order-events", groupId = "order-service") public void handleOrderEvent(OrderEvent event, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { log.info("Received: partition={}, offset={}, event={}", partition, offset, event); orderService.processOrderEvent(event); } // 批量消费 @KafkaListener(topics = "user-behavior", containerFactory = "batchKafkaListenerContainerFactory") public void handleBatch(List<UserBehaviorEvent> events) { log.info("Processing batch of {} events", events.size()); analyticsService.processBatch(events); }
RocketMQ详解
特点:
- 阿里开源,Java开发
- 支持事务消息、顺序消息
- 丰富的消息类型
基本使用:
@Service public class RocketMQProducerService { @Autowired private DefaultMQProducer producer; // 同步发送 public void sendSyncMessage(String topic, String tag, Object message) { try { Message msg = new Message(topic, tag, JSON.toJSONBytes(message)); SendResult result = producer.send(msg); log.info("Message sent: {}", result.getMsgId()); } catch (Exception e) { log.error("Send failed", e); } } // 顺序消息 public void sendOrderlyMessage(String topic, Object message, String orderKey) { try { Message msg = new Message(topic, JSON.toJSONBytes(message)); producer.send(msg, (mqs, msg1, arg) -> { String key = (String) arg; int index = Math.abs(key.hashCode()) % mqs.size(); return mqs.get(index); }, orderKey); } catch (Exception e) { log.error("Send orderly message failed", e); } } // 延迟消息 public void sendDelayMessage(String topic, Object message, int delayLevel) { try { Message msg = new Message(topic, JSON.toJSONBytes(message)); msg.setDelayTimeLevel(delayLevel); // 1s 5s 10s 30s 1m... producer.send(msg); } catch (Exception e) { log.error("Send delay message failed", e); } } }
事务消息:
@Service public class TransactionMessageService { private TransactionMQProducer transactionProducer; @PostConstruct public void init() { transactionProducer = new TransactionMQProducer("tx_producer_group"); transactionProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { OrderContext context = (OrderContext) arg; orderService.createOrder(context.getOrder()); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = msg.getProperty("orderId"); boolean exists = orderService.orderExists(orderId); return exists ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; } }); transactionProducer.start(); } public void sendTransactionMessage(Order order) { Message msg = new Message("order_topic", JSON.toJSONBytes(order)); msg.putUserProperty("orderId", order.getId()); OrderContext context = new OrderContext(order); transactionProducer.sendMessageInTransaction(msg, context); } }
📊 性能对比分析
吞吐量与延迟对比
/** * 性能测试结果(1KB消息,单机环境) */ public class PerformanceComparison { /* * 吞吐量对比: * RabbitMQ: 20,000 msg/s * Kafka: 100,000 msg/s (批量) * RocketMQ: 70,000 msg/s * * 延迟对比: * RabbitMQ: 1-5ms * Kafka: 2-10ms (批量), 0.5-2ms (单条) * RocketMQ: 1-3ms */ }
选型决策矩阵
/** * 根据业务场景选择消息队列 */ public MessageQueueType selectMessageQueue(BusinessScenario scenario) { // 高吞吐量场景 if (scenario.requiresHighThroughput()) { return MessageQueueType.KAFKA; } // 事务消息场景 if (scenario.requiresTransactionMessage()) { return MessageQueueType.ROCKETMQ; } // 复杂路由场景 if (scenario.requiresComplexRouting()) { return MessageQueueType.RABBITMQ; } // 顺序消息场景 if (scenario.requiresOrderedMessage()) { return MessageQueueType.ROCKETMQ; } return MessageQueueType.ROCKETMQ; // 默认推荐 }
特性对比表:
吞吐量 | 中 | 高 | 高 |
延迟 | 低 | 中 | 低 |
可靠性 | 高 | 高 | 高 |
消息顺序 | 支持 | 支持 | 支持 |
事务消息 | 不支持 | 不支持 | 支持 |
延迟消息 | 插件支持 | 不支持 | 支持 |
管理界面 | 友好 | 简单 | 一般 |
运维复杂度 | 中 | 高 | 中 |
🛡️ 消息可靠性保证
本地消息表模式
@Service public class ReliableMessageService { @Autowired private MessageRepository messageRepository; @Transactional public void sendReliableMessage(BusinessEvent event) { // 1. 保存到本地消息表 LocalMessage localMessage = new LocalMessage(); localMessage.setId(UUID.randomUUID().toString()); localMessage.setTopic(event.getTopic()); localMessage.setContent(JSON.toJSONString(event)); localMessage.setStatus(MessageStatus.PENDING); messageRepository.save(localMessage); // 2. 发送消息 try { SendResult result = producer.send(event.getTopic(), event); localMessage.setStatus(MessageStatus.SENT); localMessage.setMessageId(result.getMsgId()); messageRepository.save(localMessage); } catch (Exception e) { log.error("Message send failed", e); throw new MessageSendException("Send failed", e); } } // 定时重发失败消息 @Scheduled(fixedRate = 30000) public void retryFailedMessages() { List<LocalMessage> failedMessages = messageRepository .findByStatusAndCreateTimeBefore( MessageStatus.PENDING, LocalDateTime.now().minusMinutes(5) ); for (LocalMessage message : failedMessages) { try { BusinessEvent event = JSON.parseObject( message.getContent(), BusinessEvent.class); producer.send(message.getTopic(), event); message.setStatus(MessageStatus.SENT); messageRepository.save(message); } catch (Exception e) { if (message.getRetryCount() >= 3) { message.setStatus(MessageStatus.FAILED); messageRepository.save(message); } } } } }
幂等消费
@Component public class IdempotentConsumer { @Autowired private ConsumeRecordRepository consumeRecordRepository; @KafkaListener(topics = "order-events") public void handleOrderEvent(OrderEvent event, @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) { // 生成消费唯一键 String consumeKey = generateConsumeKey( event.getTopic(), messageKey, event.getEventId()); // 检查是否已消费 if (consumeRecordRepository.existsByConsumeKey(consumeKey)) { log.info("Message already consumed: {}", consumeKey); return; } try { // 执行业务逻辑 orderService.processOrderEvent(event); // 记录消费状态 ConsumeRecord record = new ConsumeRecord(); record.setConsumeKey(consumeKey); record.setStatus(ConsumeStatus.SUCCESS); record.setConsumeTime(LocalDateTime.now()); consumeRecordRepository.save(record); } catch (Exception e) { log.error("Process failed: {}", event, e); throw e; // 触发重试 } } private String generateConsumeKey(String topic, String messageKey, String eventId) { return topic + ":" + messageKey + ":" + eventId; } }
💡 面试回答要点
标准回答模板
第一部分:技术选型
"我们项目选择了RocketMQ,主要考虑以下因素: 1. 业务需要事务消息保证数据一致性 2. 需要顺序消息处理订单状态变更 3. 支持延迟消息实现定时任务 4. 阿里开源,文档完善,社区支持好"
第二部分:可靠性保证
"消息可靠性通过以下机制保证: 1. 生产者:本地消息表 + 定时重试 2. 存储:消息持久化到磁盘 3. 消费者:手动确认 + 幂等处理 4. 监控:死信队列处理异常消息"
第三部分:性能优化
"性能优化策略: 1. 批量发送减少网络开销 2. 异步发送提升吞吐量 3. 消息压缩减少存储空间 4. 合理设置分区提高并发度"
核心要点总结:
- ✅ 掌握三大主流MQ的特点和适用场景
- ✅ 理解消息可靠性保证机制
- ✅ 能够根据业务需求进行技术选型
- ✅ 具备生产环境的实践经验
Java面试圣经 文章被收录于专栏
Java面试圣经