18.8.3 消息重复消费与幂等性处理
1. 消息重复消费问题分析
1.1 重复消费产生原因
public class MessageDuplicationAnalysis {
/*
* 消息重复消费产生原因:
*
* 1. 网络异常
* - 消费者处理完消息后,ACK确认丢失
* - 网络抖动导致消息重新投递
* - 消费者重启后重新消费未确认消息
*
* 2. 消费者异常
* - 消费者处理过程中崩溃
* - 消费者处理超时
* - 消费者重启或扩容
*
* 3. 消息队列机制
* - At Least Once语义保证
* - 消息重试机制
* - 分区重平衡
*
* 4. 业务逻辑问题
* - 消费逻辑执行时间过长
* - 消费者并发处理相同消息
* - 事务回滚导致消息重新消费
*/
public void demonstrateMessageDuplication() {
System.out.println("=== 消息重复消费场景演示 ===");
MockMessageQueue messageQueue = new MockMessageQueue();
demonstrateNetworkFailure(messageQueue);
demonstrateConsumerCrash(messageQueue);
demonstrateTimeoutScenario(messageQueue);
demonstrateConcurrentConsumption(messageQueue);
}
private void demonstrateNetworkFailure(MockMessageQueue messageQueue) {
System.out.println("--- 网络异常导致重复消费 ---");
MockConsumer consumer = new MockConsumer("consumer-1");
System.out.println("1. 消费者处理消息:");
Message message = new Message("msg-001", "订单创建", "order:12345");
// 消费者处理消息
boolean processed = consumer.processMessage(message);
System.out.println("消息处理结果: " + processed);
System.out.println("\n2. ACK确认失败 (网络异常):");
boolean ackSuccess = messageQueue.acknowledgeMessage(message.getId(), false); // 模拟ACK失败
System.out.println("ACK确认结果: " + ackSuccess);
System.out.println("\n3. 消息重新投递:");
if (!ackSuccess) {
System.out.println("由于ACK失败,消息将重新投递");
boolean reprocessed = consumer.processMessage(message);
System.out.println("重复处理结果: " + reprocessed);
}
System.out.println("\n4. 风险分析:");
System.out.println(" - 订单可能被重复创建");
System.out.println(" - 库存可能被重复扣减");
System.out.println(" - 用户可能收到重复通知");
}
private void demonstrateConsumerCrash(MockMessageQueue messageQueue) {
System.out.println("\n--- 消费者崩溃导致重复消费 ---");
MockConsumer consumer = new MockConsumer("consumer-2");
System.out.println("1. 消费者开始处理消息:");
Message message = new Message("msg-002", "支付处理", "payment:67890");
try {
// 模拟消费者在处理过程中崩溃
consumer.processMessageWithCrash(message);
} catch (RuntimeException e) {
System.out.println("消费者崩溃: " + e.getMessage());
}
System.out.println("\n2. 消费者重启后重新消费:");
MockConsumer newConsumer = new MockConsumer("consumer-2-restart");
boolean reprocessed = newConsumer.processMessage(message);
System.out.println("重启后处理结果: " + reprocessed);
System.out.println("\n3. 问题影响:");
System.out.println(" - 支付可能被重复处理");
System.out.println(" - 资金可能被重复扣除");
System.out.println(" - 需要幂等性保证");
}
private void demonstrateTimeoutScenario(MockMessageQueue messageQueue) {
System.out.println("\n--- 处理超时导致重复消费 ---");
MockConsumer slowConsumer = new MockConsumer("slow-consumer");
System.out.println("1. 消费者处理耗时消息:");
Message message = new Message("msg-003", "数据同步", "sync:data:large");
// 模拟处理超时
boolean processed = slowConsumer.processSlowMessage(message, 5000); // 5秒处理时间
System.out.println("消息处理结果: " + processed);
System.out.println("\n2. 消息队列超时重投:");
if (messageQueue.isMessageTimeout(message.getId(), 3000)) { // 3秒超时
System.out.println("消息处理超时,重新投递给其他消费者");
MockConsumer fastConsumer = new MockConsumer("fast-consumer");
boolean reprocessed = fastConsumer.processMessage(message);
System.out.println("其他消费者处理结果: " + reprocessed);
}
System.out.println("\n3. 超时处理策略:");
System.out.println(" - 设置合理的消息超时时间");
System.out.println(" - 优化消费者处理逻辑");
System.out.println(" - 实现幂等性处理");
}
private void demonstrateConcurrentConsumption(MockMessageQueue messageQueue) {
System.out.println("\n--- 并发消费导致重复处理 ---");
Message message = new Message("msg-004", "库存扣减", "inventory:reduce:100");
System.out.println("1. 多个消费者同时处理相同消息:");
// 启动多个消费者线程
for (int i = 1; i <= 3; i++) {
final int consumerId = i;
new Thread(() -> {
MockConsumer consumer = new MockConsumer("concurrent-consumer-" + consumerId);
boolean processed = consumer.processMessage(message);
System.out.println("消费者" + consumerId + "处理结果: " + processed);
}).start();
}
try {
Thread.sleep(1000); // 等待并发处理完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("\n2. 并发问题分析:");
System.out.println(" - 相同消息被多个消费者处理");
System.out.println(" - 可能导致数据不一致");
System.out.println(" - 需要分布式锁或幂等性控制");
}
}
// 消息实体
class Message {
private String id;
private String type;
private String payload;
private long timestamp;
public Message(String id, String type, String payload) {
this.id = id;
this.type = type;
this.payload = payload;
this.timestamp = System.currentTimeMillis();
}
public String getId() { return id; }
public String getType() { return type; }
public String getPayload() { return payload; }
public long getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("Message{id='%s', type='%s', payload='%s'}", id, type, payload);
}
}
// 模拟消息队列
class MockMessageQueue {
private java.util.Map<String, Long> messageTimestamps = new java.util.concurrent.ConcurrentHashMap<>();
public boolean acknowledgeMessage(String messageId, boolean success) {
if (success) {
messageTimestamps.remove(messageId);
System.out.println(" 消息ACK成功: " + messageId);
return true;
} else {
System.out.println(" 消息ACK失败: " + messageId);
return false;
}
}
public boolean isMessageTimeout(String messageId, long timeoutMs) {
Long timestamp = messageTimestamps.get(messageId);
if (timestamp != null) {
return System.currentTimeMillis() - timestamp > timeoutMs;
}
return false;
}
}
// 模拟消费者
class MockConsumer {
private String consumerId;
private java.util.Set<String> processedMessages = new java.util.concurrent.ConcurrentHashMap<String, Boolean>().keySet(java.util.concurrent.ConcurrentHashMap.newKeySet());
public MockConsumer(String consumerId) {
this.consumerId = consumerId;
}
public boolean processMessage(Message message) {
System.out.println(" " + consumerId + " 开始处理消息: " + message);
// 模拟业务处理
try {
Thread.sleep(100); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 记录已处理消息
processedMessages.add(message.getId());
System.out.println(" " + consumerId + " 处理完成: " + message.getId());
return true;
}
public boolean processMessageWithCrash(Message message) {
System.out.println(" " + consumerId + " 开始处理消息: " + message);
// 模拟处理过程中崩溃
try {
Thread.sleep(50);
throw new RuntimeException("消费者崩溃");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
public boolean processSlowMessage(Message message, long processTimeMs) {
System.out.println(" " + consumerId + " 开始处理耗时消息: " + message);
try {
Thread.sleep(processTimeMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(" " + consumerId + " 耗时处理完成: " + message.getId());
return true;
}
}
2. 幂等性设计与实现
2.1 幂等性实现方案
public class IdempotencyImplementation {
/*
* 幂等性实现方案:
*
* 1. 唯一键去重
* - 数据库唯一约束
* - Redis Set去重
* - 业务唯一标识
*
* 2. 状态机控制
* - 订单状态流转
* - 支付状态控制
* - 业务状态检查
*
* 3. 分布式锁
*
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经
