9.1 消息队列选型对比

面试重要程度:⭐⭐⭐⭐⭐

常见提问方式: "你们项目中用的什么消息队列?为什么选择它?"

技术深度: 架构选型、性能对比、可靠性保证

预计阅读时间:30分钟

🎯 消息队列核心概念

什么是消息队列

消息队列(Message Queue,MQ)是一种应用程序间的通信方法,通过在消息的传输过程中保存消息的容器来实现。

核心作用:

  • 解耦:降低系统间的耦合度
  • 异步:提升系统响应速度
  • 削峰:应对流量高峰
  • 可靠性:保证消息不丢失

消息队列基本模型

/**
 * 点对点模式(Queue)
 */
@RabbitListener(queues = "order.queue")
public void processOrder(OrderMessage message) {
    log.info("Processing order: {}", message.getOrderId());
    // 只有一个消费者能收到这条消息
}

/**
 * 发布订阅模式(Topic)
 */
@RabbitListener(queues = "user.register.email")
public void sendEmail(UserRegisterEvent event) {
    log.info("Sending welcome email to: {}", event.getEmail());
}

@RabbitListener(queues = "user.register.sms")
public void sendSMS(UserRegisterEvent event) {
    log.info("Sending welcome SMS to: {}", event.getPhone());
}

🔄 主流消息队列对比

RabbitMQ详解

特点:

  • 基于AMQP协议,Erlang开发
  • 功能丰富,管理界面友好
  • 支持多种消息模式

核心配置:

@Configuration
public class RabbitMQConfig {
    
    // Direct Exchange - 精确匹配
    @Bean
    public DirectExchange orderDirectExchange() {
        return new DirectExchange("order.direct");
    }
    
    // Topic Exchange - 模式匹配
    @Bean
    public TopicExchange userTopicExchange() {
        return new TopicExchange("user.topic");
    }
    
    // 队列声明
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.process")
            .withArgument("x-message-ttl", 60000)
            .withArgument("x-max-length", 10000)
            .build();
    }
}

@Service
public class RabbitMQProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendOrderMessage(OrderMessage message) {
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        properties.setExpiration("60000");
        
        Message rabbitMessage = new Message(
            JSON.toJSONBytes(message), properties);
        
        rabbitTemplate.send("order.direct", "order.create", rabbitMessage);
    }
}

可靠性保证:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    
    // 生产者确认
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            log.info("Message sent successfully: {}", correlationData);
        } else {
            log.error("Message send failed: {}", cause);
        }
    });
    
    // 消息返回确认
    template.setReturnsCallback(returned -> {
        log.error("Message returned: {}", returned.getMessage());
    });
    
    return template;
}

@RabbitListener(queues = "order.queue")
public void processOrder(OrderMessage message, Channel channel, 
                       @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    try {
        orderService.processOrder(message);
        channel.basicAck(deliveryTag, false); // 手动确认
    } catch (Exception e) {
        channel.basicNack(deliveryTag, false, true); // 重新入队
    }
}

Kafka详解

特点:

  • 高吞吐量、低延迟
  • 分布式、可扩展
  • 适合大数据场景

生产者配置:

@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        
        return new DefaultKafkaProducerFactory<>(props);
    }
}

@Service
public class KafkaProducerService {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    public void sendOrderEvent(OrderEvent event) {
        kafkaTemplate.send("order-events", event.getOrderId(), event)
            .addCallback(
                result -> log.info("Order event sent: {}", event),
                failure -> log.error("Send failed: {}", event, failure)
            );
    }
}

消费者配置:

@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrderEvent(OrderEvent event, 
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                           @Header(KafkaHeaders.OFFSET) long offset) {
    
    log.info("Received: partition={}, offset={}, event={}", 
        partition, offset, event);
    
    orderService.processOrderEvent(event);
}

// 批量消费
@KafkaListener(topics = "user-behavior", 
               containerFactory = "batchKafkaListenerContainerFactory")
public void handleBatch(List<UserBehaviorEvent> events) {
    log.info("Processing batch of {} events", events.size());
    analyticsService.processBatch(events);
}

RocketMQ详解

特点:

  • 阿里开源,Java开发
  • 支持事务消息、顺序消息
  • 丰富的消息类型

基本使用:

@Service
public class RocketMQProducerService {
    
    @Autowired
    private DefaultMQProducer producer;
    
    // 同步发送
    public void sendSyncMessage(String topic, String tag, Object message) {
        try {
            Message msg 

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java面试圣经 文章被收录于专栏

Java面试圣经,带你练透java圣经

全部评论
欢迎讨论
点赞 回复 分享
发布于 09-06 11:27 江西

相关推荐

评论
点赞
2
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务