9.1 消息队列选型对比

面试重要程度:⭐⭐⭐⭐⭐

常见提问方式: "你们项目中用的什么消息队列?为什么选择它?"

技术深度: 架构选型、性能对比、可靠性保证

预计阅读时间:30分钟

🎯 消息队列核心概念

什么是消息队列

消息队列(Message Queue,MQ)是一种应用程序间的通信方法,通过在消息的传输过程中保存消息的容器来实现。

核心作用:

  • 解耦:降低系统间的耦合度
  • 异步:提升系统响应速度
  • 削峰:应对流量高峰
  • 可靠性:保证消息不丢失

消息队列基本模型

/**
 * 点对点模式(Queue)
 */
@RabbitListener(queues = "order.queue")
public void processOrder(OrderMessage message) {
    log.info("Processing order: {}", message.getOrderId());
    // 只有一个消费者能收到这条消息
}

/**
 * 发布订阅模式(Topic)
 */
@RabbitListener(queues = "user.register.email")
public void sendEmail(UserRegisterEvent event) {
    log.info("Sending welcome email to: {}", event.getEmail());
}

@RabbitListener(queues = "user.register.sms")
public void sendSMS(UserRegisterEvent event) {
    log.info("Sending welcome SMS to: {}", event.getPhone());
}

🔄 主流消息队列对比

RabbitMQ详解

特点:

  • 基于AMQP协议,Erlang开发
  • 功能丰富,管理界面友好
  • 支持多种消息模式

核心配置:

@Configuration
public class RabbitMQConfig {
    
    // Direct Exchange - 精确匹配
    @Bean
    public DirectExchange orderDirectExchange() {
        return new DirectExchange("order.direct");
    }
    
    // Topic Exchange - 模式匹配
    @Bean
    public TopicExchange userTopicExchange() {
        return new TopicExchange("user.topic");
    }
    
    // 队列声明
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.process")
            .withArgument("x-message-ttl", 60000)
            .withArgument("x-max-length", 10000)
            .build();
    }
}

@Service
public class RabbitMQProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendOrderMessage(OrderMessage message) {
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        properties.setExpiration("60000");
        
        Message rabbitMessage = new Message(
            JSON.toJSONBytes(message), properties);
        
        rabbitTemplate.send("order.direct", "order.create", rabbitMessage);
    }
}

可靠性保证:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    
    // 生产者确认
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            log.info("Message sent successfully: {}", correlationData);
        } else {
            log.error("Message send failed: {}", cause);
        }
    });
    
    // 消息返回确认
    template.setReturnsCallback(returned -> {
        log.error("Message returned: {}", returned.getMessage());
    });
    
    return template;
}

@RabbitListener(queues = "order.queue")
public void processOrder(OrderMessage message, Channel channel, 
                       @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    try {
        orderService.processOrder(message);
        channel.basicAck(deliveryTag, false); // 手动确认
    } catch (Exception e) {
        channel.basicNack(deliveryTag, false, true); // 重新入队
    }
}

Kafka详解

特点:

  • 高吞吐量、低延迟
  • 分布式、可扩展
  • 适合大数据场景

生产者配置:

@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        
        return new DefaultKafkaProducerFactory<>(props);
    }
}

@Service
public class KafkaProducerService {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    public void sendOrderEvent(OrderEvent event) {
        kafkaTemplate.send("order-events", event.getOrderId(), event)
            .addCallback(
                result -> log.info("Order event sent: {}", event),
                failure -> log.error("Send failed: {}", event, failure)
            );
    }
}

消费者配置:

@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrderEvent(OrderEvent event, 
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                           @Header(KafkaHeaders.OFFSET) long offset) {
    
    log.info("Received: partition={}, offset={}, event={}", 
        partition, offset, event);
    
    orderService.processOrderEvent(event);
}

// 批量消费
@KafkaListener(topics = "user-behavior", 
               containerFactory = "batchKafkaListenerContainerFactory")
public void handleBatch(List<UserBehaviorEvent> events) {
    log.info("Processing batch of {} events", events.size());
    analyticsService.processBatch(events);
}

RocketMQ详解

特点:

  • 阿里开源,Java开发
  • 支持事务消息、顺序消息
  • 丰富的消息类型

基本使用:

@Service
public class RocketMQProducerService {
    
    @Autowired
    private DefaultMQProducer producer;
    
    // 同步发送
    public void sendSyncMessage(String topic, String tag, Object message) {
        try {
            Message msg = new Message(topic, tag, JSON.toJSONBytes(message));
            SendResult result = producer.send(msg);
            log.info("Message sent: {}", result.getMsgId());
        } catch (Exception e) {
            log.error("Send failed", e);
        }
    }
    
    // 顺序消息
    public void sendOrderlyMessage(String topic, Object message, String orderKey) {
        try {
            Message msg = new Message(topic, JSON.toJSONBytes(message));
            producer.send(msg, (mqs, msg1, arg) -> {
                String key = (String) arg;
                int index = Math.abs(key.hashCode()) % mqs.size();
                return mqs.get(index);
            }, orderKey);
        } catch (Exception e) {
            log.error("Send orderly message failed", e);
        }
    }
    
    // 延迟消息
    public void sendDelayMessage(String topic, Object message, int delayLevel) {
        try {
            Message msg = new Message(topic, JSON.toJSONBytes(message));
            msg.setDelayTimeLevel(delayLevel); // 1s 5s 10s 30s 1m...
            producer.send(msg);
        } catch (Exception e) {
            log.error("Send delay message failed", e);
        }
    }
}

事务消息:

@Service
public class TransactionMessageService {
    
    private TransactionMQProducer transactionProducer;
    
    @PostConstruct
    public void init() {
        transactionProducer = new TransactionMQProducer("tx_producer_group");
        
        transactionProducer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                try {
                    OrderContext context = (OrderContext) arg;
                    orderService.createOrder(context.getOrder());
                    return LocalTransactionState.COMMIT_MESSAGE;
                } catch (Exception e) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                String orderId = msg.getProperty("orderId");
                boolean exists = orderService.orderExists(orderId);
                return exists ? LocalTransactionState.COMMIT_MESSAGE 
                             : LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });
        
        transactionProducer.start();
    }
    
    public void sendTransactionMessage(Order order) {
        Message msg = new Message("order_topic", JSON.toJSONBytes(order));
        msg.putUserProperty("orderId", order.getId());
        
        OrderContext context = new OrderContext(order);
        transactionProducer.sendMessageInTransaction(msg, context);
    }
}

📊 性能对比分析

吞吐量与延迟对比

/**
 * 性能测试结果(1KB消息,单机环境)
 */
public class PerformanceComparison {
    /*
     * 吞吐量对比:
     * RabbitMQ:  20,000 msg/s
     * Kafka:     100,000 msg/s (批量)
     * RocketMQ:  70,000 msg/s
     * 
     * 延迟对比:
     * RabbitMQ:  1-5ms
     * Kafka:     2-10ms (批量), 0.5-2ms (单条)
     * RocketMQ:  1-3ms
     */
}

选型决策矩阵

/**
 * 根据业务场景选择消息队列
 */
public MessageQueueType selectMessageQueue(BusinessScenario scenario) {
    
    // 高吞吐量场景
    if (scenario.requiresHighThroughput()) {
        return MessageQueueType.KAFKA;
    }
    
    // 事务消息场景
    if (scenario.requiresTransactionMessage()) {
        return MessageQueueType.ROCKETMQ;
    }
    
    // 复杂路由场景
    if (scenario.requiresComplexRouting()) {
        return MessageQueueType.RABBITMQ;
    }
    
    // 顺序消息场景
    if (scenario.requiresOrderedMessage()) {
        return MessageQueueType.ROCKETMQ;
    }
    
    return MessageQueueType.ROCKETMQ; // 默认推荐
}

特性对比表:

吞吐量

延迟

可靠性

消息顺序

支持

支持

支持

事务消息

不支持

不支持

支持

延迟消息

插件支持

不支持

支持

管理界面

友好

简单

一般

运维复杂度

🛡️ 消息可靠性保证

本地消息表模式

@Service
public class ReliableMessageService {
    
    @Autowired
    private MessageRepository messageRepository;
    
    @Transactional
    public void sendReliableMessage(BusinessEvent event) {
        // 1. 保存到本地消息表
        LocalMessage localMessage = new LocalMessage();
        localMessage.setId(UUID.randomUUID().toString());
        localMessage.setTopic(event.getTopic());
        localMessage.setContent(JSON.toJSONString(event));
        localMessage.setStatus(MessageStatus.PENDING);
        messageRepository.save(localMessage);
        
        // 2. 发送消息
        try {
            SendResult result = producer.send(event.getTopic(), event);
            localMessage.setStatus(MessageStatus.SENT);
            localMessage.setMessageId(result.getMsgId());
            messageRepository.save(localMessage);
        } catch (Exception e) {
            log.error("Message send failed", e);
            throw new MessageSendException("Send failed", e);
        }
    }
    
    // 定时重发失败消息
    @Scheduled(fixedRate = 30000)
    public void retryFailedMessages() {
        List<LocalMessage> failedMessages = messageRepository
            .findByStatusAndCreateTimeBefore(
                MessageStatus.PENDING, 
                LocalDateTime.now().minusMinutes(5)
            );
        
        for (LocalMessage message : failedMessages) {
            try {
                BusinessEvent event = JSON.parseObject(
                    message.getContent(), BusinessEvent.class);
                producer.send(message.getTopic(), event);
                
                message.setStatus(MessageStatus.SENT);
                messageRepository.save(message);
            } catch (Exception e) {
                if (message.getRetryCount() >= 3) {
                    message.setStatus(MessageStatus.FAILED);
                    messageRepository.save(message);
                }
            }
        }
    }
}

幂等消费

@Component
public class IdempotentConsumer {
    
    @Autowired
    private ConsumeRecordRepository consumeRecordRepository;
    
    @KafkaListener(topics = "order-events")
    public void handleOrderEvent(OrderEvent event, 
                               @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) {
        
        // 生成消费唯一键
        String consumeKey = generateConsumeKey(
            event.getTopic(), messageKey, event.getEventId());
        
        // 检查是否已消费
        if (consumeRecordRepository.existsByConsumeKey(consumeKey)) {
            log.info("Message already consumed: {}", consumeKey);
            return;
        }
        
        try {
            // 执行业务逻辑
            orderService.processOrderEvent(event);
            
            // 记录消费状态
            ConsumeRecord record = new ConsumeRecord();
            record.setConsumeKey(consumeKey);
            record.setStatus(ConsumeStatus.SUCCESS);
            record.setConsumeTime(LocalDateTime.now());
            consumeRecordRepository.save(record);
            
        } catch (Exception e) {
            log.error("Process failed: {}", event, e);
            throw e; // 触发重试
        }
    }
    
    private String generateConsumeKey(String topic, String messageKey, String eventId) {
        return topic + ":" + messageKey + ":" + eventId;
    }
}

💡 面试回答要点

标准回答模板

第一部分:技术选型

"我们项目选择了RocketMQ,主要考虑以下因素:
1. 业务需要事务消息保证数据一致性
2. 需要顺序消息处理订单状态变更
3. 支持延迟消息实现定时任务
4. 阿里开源,文档完善,社区支持好"

第二部分:可靠性保证

"消息可靠性通过以下机制保证:
1. 生产者:本地消息表 + 定时重试
2. 存储:消息持久化到磁盘
3. 消费者:手动确认 + 幂等处理
4. 监控:死信队列处理异常消息"

第三部分:性能优化

"性能优化策略:
1. 批量发送减少网络开销
2. 异步发送提升吞吐量
3. 消息压缩减少存储空间
4. 合理设置分区提高并发度"

核心要点总结:

  • ✅ 掌握三大主流MQ的特点和适用场景
  • ✅ 理解消息可靠性保证机制
  • ✅ 能够根据业务需求进行技术选型
  • ✅ 具备生产环境的实践经验
Java面试圣经 文章被收录于专栏

Java面试圣经

全部评论

相关推荐

昨天 22:06
东北大学 Java
20min&nbsp;实习怎么用redis+token实现登录的?Redis缓存token这种存储方式的弊端,存在什么安全隐患?这种方式的弊端后续怎么去解决?Redis缓存token业务层面会有哪些风险,业务层面的风险怎么解决?Redis高并发、低耗时的底层是因为什么机制?Redis主从同步的逻辑是什么,主从同步有哪几种方式,持久化的方式,最常用哪些方式?Redis支持事务吗,怎么支持?慢查询怎么定位和规避,在日常开发情况下,怎么做规避,有没有关于SQL的最佳实践、最佳原理。20min场景题在抖音里面有一个关注功能,设计关注跟取消关注功能,怎么去设计,包括底层的设计、存储设计。对于用户的规模不一样的情况(小博主、大博主),底层在设计的时候会有什么差异?一个网红博主,发了一条动态,怎么去发送给粉丝?上游怎么去消费发的这些消息?5min开放题未来职业规划+个人优势10+min手撕输出一个数组的全排列&nbsp;a&nbsp;b&nbsp;c&nbsp;-&gt;&nbsp;abc&nbsp;acb&nbsp;bac&nbsp;bca&nbsp;cab&nbsp;cba第二天挂基本全是场景题和设计方法,看似很开放,但还是要答出来面试官想听到的点,我感觉我说的挺对的,实际上可能最开始回答的方向就不对,讲了很多系统设计上的思考,忽视了业务方向的思考。难难难,实在是太难了,有一种有力没处使的感觉。已经换部门重新从一面开始了
求offer的花生米...:面字节太累了,剪映飞书全都是最后一轮挂了,心态都炸了
查看12道真题和解析
点赞 评论 收藏
分享
评论
点赞
2
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务