9.1 消息队列选型对比
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式: "你们项目中用的什么消息队列?为什么选择它?"
技术深度: 架构选型、性能对比、可靠性保证
预计阅读时间:30分钟
🎯 消息队列核心概念
什么是消息队列
消息队列(Message Queue,MQ)是一种应用程序间的通信方法,通过在消息的传输过程中保存消息的容器来实现。
核心作用:
- 解耦:降低系统间的耦合度
- 异步:提升系统响应速度
- 削峰:应对流量高峰
- 可靠性:保证消息不丢失
消息队列基本模型
/** * 点对点模式(Queue) */ @RabbitListener(queues = "order.queue") public void processOrder(OrderMessage message) { log.info("Processing order: {}", message.getOrderId()); // 只有一个消费者能收到这条消息 } /** * 发布订阅模式(Topic) */ @RabbitListener(queues = "user.register.email") public void sendEmail(UserRegisterEvent event) { log.info("Sending welcome email to: {}", event.getEmail()); } @RabbitListener(queues = "user.register.sms") public void sendSMS(UserRegisterEvent event) { log.info("Sending welcome SMS to: {}", event.getPhone()); }
🔄 主流消息队列对比
RabbitMQ详解
特点:
- 基于AMQP协议,Erlang开发
- 功能丰富,管理界面友好
- 支持多种消息模式
核心配置:
@Configuration public class RabbitMQConfig { // Direct Exchange - 精确匹配 @Bean public DirectExchange orderDirectExchange() { return new DirectExchange("order.direct"); } // Topic Exchange - 模式匹配 @Bean public TopicExchange userTopicExchange() { return new TopicExchange("user.topic"); } // 队列声明 @Bean public Queue orderQueue() { return QueueBuilder.durable("order.process") .withArgument("x-message-ttl", 60000) .withArgument("x-max-length", 10000) .build(); } } @Service public class RabbitMQProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrderMessage(OrderMessage message) { MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setExpiration("60000"); Message rabbitMessage = new Message( JSON.toJSONBytes(message), properties); rabbitTemplate.send("order.direct", "order.create", rabbitMessage); } }
可靠性保证:
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); // 生产者确认 template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("Message sent successfully: {}", correlationData); } else { log.error("Message send failed: {}", cause); } }); // 消息返回确认 template.setReturnsCallback(returned -> { log.error("Message returned: {}", returned.getMessage()); }); return template; } @RabbitListener(queues = "order.queue") public void processOrder(OrderMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { orderService.processOrder(message); channel.basicAck(deliveryTag, false); // 手动确认 } catch (Exception e) { channel.basicNack(deliveryTag, false, true); // 重新入队 } }
Kafka详解
特点:
- 高吞吐量、低延迟
- 分布式、可扩展
- 适合大数据场景
生产者配置:
@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); return new DefaultKafkaProducerFactory<>(props); } } @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void sendOrderEvent(OrderEvent event) { kafkaTemplate.send("order-events", event.getOrderId(), event) .addCallback( result -> log.info("Order event sent: {}", event), failure -> log.error("Send failed: {}", event, failure) ); } }
消费者配置:
@KafkaListener(topics = "order-events", groupId = "order-service") public void handleOrderEvent(OrderEvent event, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { log.info("Received: partition={}, offset={}, event={}", partition, offset, event); orderService.processOrderEvent(event); } // 批量消费 @KafkaListener(topics = "user-behavior", containerFactory = "batchKafkaListenerContainerFactory") public void handleBatch(List<UserBehaviorEvent> events) { log.info("Processing batch of {} events", events.size()); analyticsService.processBatch(events); }
RocketMQ详解
特点:
- 阿里开源,Java开发
- 支持事务消息、顺序消息
- 丰富的消息类型
基本使用:
@Service public class RocketMQProducerService { @Autowired private DefaultMQProducer producer; // 同步发送 public void sendSyncMessage(String topic, String tag, Object message) { try { Message msg
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经