Java秋招面试之消息队列与分布式话题
第9章 消息队列与分布式
面试重要程度:⭐⭐⭐⭐
常见提问方式:消息可靠性保证、分布式事务、服务治理
预计阅读时间:40分钟
开场白
兄弟,消息队列和分布式系统是现代后端架构的核心!随着微服务架构的普及,这块知识在面试中的重要性越来越高。不管是RabbitMQ、Kafka,还是分布式事务、服务治理,都是面试官爱考的重点。
今天我们就把这些分布式核心技术搞透,让你在面试中展现出对大型系统架构的深度理解。
📨 9.1 消息队列选型对比
RabbitMQ vs Kafka vs RocketMQ
面试必问:
面试官:"常见的消息队列有哪些?各自的特点和适用场景是什么?"
三大消息队列对比:
开发语言 |
Erlang |
Scala/Java |
Java |
协议支持 |
AMQP, STOMP, MQTT |
自定义协议 |
自定义协议 |
消息顺序 |
支持 |
分区内有序 |
支持 |
消息可靠性 |
高 |
高 |
高 |
性能 |
万级QPS |
十万级QPS |
十万级QPS |
延时 |
微秒级 |
毫秒级 |
毫秒级 |
可用性 |
高(镜像队列) |
高(副本机制) |
高(主从架构) |
消息回溯 |
不支持 |
支持 |
支持 |
消息过滤 |
不支持 |
不支持 |
支持 |
事务消息 |
支持 |
支持 |
支持 |
定时消息 |
插件支持 |
不支持 |
支持 |
使用场景分析:
// RabbitMQ适用场景 /* 1. 业务逻辑复杂,需要多种消息模式 2. 对消息可靠性要求极高 3. 需要复杂的路由规则 4. 系统规模中等,QPS要求不是特别高 */ // Kafka适用场景 /* 1. 大数据处理,日志收集 2. 高吞吐量场景 3. 流式数据处理 4. 需要消息回溯功能 */ // RocketMQ适用场景 /* 1. 电商、金融等对可靠性要求高的场景 2. 需要事务消息 3. 需要定时消息、延时消息 4. 需要消息过滤功能 */
RabbitMQ核心概念
AMQP模型:
@Configuration
@EnableRabbit
public class RabbitMQConfig {
// 1. 直连交换机(Direct Exchange)
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange", true, false);
}
// 2. 主题交换机(Topic Exchange)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange", true, false);
}
// 3. 扇形交换机(Fanout Exchange)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange", true, false);
}
// 4. 头部交换机(Headers Exchange)
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.exchange", true, false);
}
// 队列定义
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-message-ttl", 60000) // 消息TTL
.withArgument("x-max-length", 1000) // 队列最大长度
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dlq").build();
}
// 绑定关系
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(directExchange())
.with("order.create");
}
// 死信队列绑定
@Bean
public Queue orderQueueWithDLX() {
return QueueBuilder.durable("order.queue.dlx")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "order.dead")
.build();
}
}
消息生产者:
@Service
public class OrderMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送普通消息
public void sendOrderMessage(Order order) {
rabbitTemplate.convertAndSend("direct.exchange", "order.create", order);
}
// 发送延时消息
public void sendDelayMessage(Order order, int delaySeconds) {
rabbitTemplate.convertAndSend("direct.exchange", "order.delay", order, message -> {
message.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));
return message;
});
}
// 发送事务消息
@Transactional
public void sendTransactionalMessage(Order order) {
// 本地事务操作
orderService.saveOrder(order);
// 发送消息(在同一事务中)
rabbitTemplate.convertAndSend("direct.exchange", "order.create", order);
}
// 发送确认消息
public void sendConfirmMessage(Order order) {
// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息发送成功: {}", correlationData);
} else {
log.error("消息发送失败: {}, 原因: {}", correlationData, cause);
// 重试或记录失败日志
}
});
// 设置返回回调
rabbitTemplate.setReturnsCallback(returned -> {
log.error("消息被退回: {}", returned);
// 处理被退回的消息
});
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("direct.exchange", "order.create", order, correlationData);
}
}
消息消费者:
@Component
public class OrderMessageConsumer {
@Autowired
private OrderService orderService;
// 基本消费
@RabbitListener(queues = "order.queue")
public void handleOrderMessage(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
log.info("收到订单消息: {}", order);
orderService.processOrder(order);
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理订单消息失败", e);
try {
// 拒绝消息,重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("拒绝消息失败", ioException);
}
}
}
// 批量消费
@RabbitListener(queues = "order.batch.queue",
containerFactory = "batchRabbitListenerContainerFactory")
public void handleBatchOrderMessages(List<Order> orders) {
log.info("批量处理订单消息,数量: {}", orders.size());
orderService.batchProcessOrders(orders);
}
// 死信队列消费
@RabbitListener(queues = "order.dlq")
public void handleDeadLetterMessage(Order order,
@Header Map<String, Object> headers) {
log.warn("处理死信消息: {}, headers: {}", order, headers);
// 记录失败原因
String reason = (String) headers.get("x-first-death-reason");
orderService.recordFailedOrder(order, reason);
// 可以选择重试或人工处理
}
}
Kafka核心概念
Kafka架构:
@Configuration
@EnableKafka
public class KafkaConfig {
// 生产者配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
// 性能配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// 消费者配置
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 消费配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次拉取记录数
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 并发配置
factory.setConcurrency(3); // 3个消费者线程
// 手动确认模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
Kafka生产者:
@Service
public class KafkaOrderProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
public void sendOrderEvent(String topic, Order order) {
kafkaTemplate.send(topic, order.getId().toString(), order)
.addCallback(
result -> log.info("消息发送成功: {}", result),
failure -> log.error("消息发送失败", failure)
);
}
// 发送事务消息
@Transactional
public void sendTransactionalOrderEvent(Order order) {
kafkaTemplate.executeInTransaction(operations -> {
// 发送多个相关消息
operations.send("order-created", order.getId().toString(), order);
operations.send("inventory-update", order.getProductId().toString(),
new InventoryUpdateEvent(order.getProductId(), -order.getQuantity()));
operations.send("payment-request", order.getId().toString(),
new PaymentRequestEvent(order.getId(), order.getAmount()));
return true;
});
}
// 发送分区消息
public void sendPartitionedMessage(Order order) {
// 根据用户ID分区,保证同一用户的消息有序
int partition = Math.abs(order.getUserId().hashCode()) % 3;
kafkaTemplate.send("order-events", partition, order.getId().toString(), order);
}
}
Kafka消费者:
@Component
public class KafkaOrderConsumer {
@Autowired
private OrderService orderService;
// 基本消费
@KafkaListener(topics = "order-created", groupId = "order-service")
public void handleOrderCreated(ConsumerRecord<String, Order> record, Acknowledgment ack) {
try {
Order order = record.value();
log.info("处理订单创建事件: partition={}, offset={}, order={}",
record.partition(), record.offset(), order);
orderService.handleOrderCreated(order);
// 手动确认
ack.acknowledge();
} catch (Exception e) {
log.error("处理订单创建事件失败", e);
// 不确认,消息会重新消费
}
}
// 批量消费
@KafkaListener(topics = "order-events",
containerFactory = "batchKafkaListenerContainerFactory")
public void handleOrderEventsBatch(List<ConsumerRecord<String, Order>> records,
Acknowledgment ack) {
log.info("批量处理订单事件,数量: {}", records.size());
List<Order> orders = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
orderService.batchProcessOrders(orders);
ack.acknowledge();
}
// 错误处理
@KafkaListener(topics = "order-created")
public void handleOrderCreatedWithRetry(Order order,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
try {
orderService.handleOrderCreated(order);
} catch (RetryableException e) {
log.warn("处理失败,将重试: topic={}, partition={}, offset={}", topic, partition, offset);
throw e; // 抛出异常,触发重试
} catch (Exception e) {
log.error("处理失败,发送到死信队列: topic={}, partition={}, offset={}", topic, partition, offset);
sendToDeadLetterTopic(order, e);
}
}
private void sendToDeadLetterTopic(Order order, Exception e) {
// 发送到死信主题
kafkaTemplate.send("order-created-dlt", order.getId().toString(),
new DeadLetterMessage(order, e.getMessage()));
}
}
🔄 9.2 分布式事务解决方案
2PC、3PC、TCC、Saga模式
面试重点:
面试官:"分布式事务有哪些解决方案?各自的优缺点是什么?"
分布式事务对比:
2PC |
强一致 |
低 |
低 |
低 |
小规模系统 |
3PC |
强一致 |
中 |
低 |
中 |
改进2PC |
TCC |
最终一致 |
高 |
中 |
高 |
金融支付 |
Saga |
最终一致 |
高 |
高 |
中 |
长流程业务 |
消息事务 |
最终一致 |
高 |
高 |
中 |
异步场景 |
TCC模式实现
TCC框架实现:
// TCC接口定义
public interface TccTransaction {
/**
* Try阶段:尝试执行业务,预留资源
*/
boolean tryExecute(TccContext context);
/**
* Confirm阶段:确认执行业务,提交资源
*/
boolean confirmExecute(TccContext context);
/**
* Cancel阶段:取消执行业务,释放资源
*/
boolean cancelExecute(TccContext context);
}
// 账户服务TCC实现
@Service
public class AccountTccService implements TccTransaction {
@Autowired
private AccountMapper accountMapper;
@Autowired
private AccountFreezeMapper freezeMapper;
@Override
public boolean tryExecute(TccContext context) {
TransferRequest request = context.getRequest(TransferRequest.class);
try {
// 1. 检查账户余额
Account account = accountMapper.selectById(request.getFromAccountId());
if (account.getBalance().compareTo(request.getAmount()) < 0) {
return false; // 余额不足
}
// 2. 冻结资金
AccountFreeze freeze = new AccountFreeze();
freeze.setTransactionId(context.getTransactionId());
freeze.setAccountId(request.getFromAccountId());
freeze.setAmount(request.getAmount());
freeze.setStatus(FreezeStatus.FROZEN);
freeze.setCreateTime(LocalDateTime.now());
freezeMapper.insert(freeze);
// 3. 更新账户余额(预扣)
account.setBalance(account.getBalance().subtract(request.getAmount()));
account.setFrozenAmount(account.getFrozenAmount().add(request.getAmount()));
accountMapper.updateById(account);
log.info("TCC Try阶段成功: transactionId={}, amount={}",
context.getTransactionId(), request.getAmount());
return true;
} catch (Exception e) {
log.error("TCC Try阶段失败", e);
return false;
}
}
@Override
public boolean confirmExecute(TccContext context) {
String transactionId = context.getTransactionId();
try {
// 1. 查询冻结记录
AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId);
if (freeze == null || freeze.getStatus() == FreezeStatus.CONFIRMED) {
return true; // 幂等处理
}
// 2. 确认扣款,清除冻结金额
Account account = accountMapper.selectById(freeze.getAccountId());
account.setFrozenAmount(account.getFrozenAmount().subtract(freeze.getAmount()));
accountMapper.updateById(account);
// 3. 更新冻结记录状态
freeze.setStatus(FreezeStatus.CONFIRMED);
freeze.setUpdateTime(LocalDateTime.now());
freezeMapper.updateById(freeze);
log.info("TCC Confirm阶段成功: transactionId={}", transactionId);
return true;
} catch (Exception e) {
log.error("TCC Confirm阶段失败", e);
return false;
}
}
@Override
public boolean cancelExecute(TccContext context) {
String transactionId = context.getTransactionId();
try {
// 1. 查询冻结记录
AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId);
if (freeze == null || freeze.getStatus() == FreezeStatus
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经
查看10道真题和解析
