Java秋招面试之消息队列与分布式话题

第9章 消息队列与分布式

面试重要程度:⭐⭐⭐⭐

常见提问方式:消息可靠性保证、分布式事务、服务治理

预计阅读时间:40分钟

开场白

兄弟,消息队列和分布式系统是现代后端架构的核心!随着微服务架构的普及,这块知识在面试中的重要性越来越高。不管是RabbitMQ、Kafka,还是分布式事务、服务治理,都是面试官爱考的重点。

今天我们就把这些分布式核心技术搞透,让你在面试中展现出对大型系统架构的深度理解。

📨 9.1 消息队列选型对比

RabbitMQ vs Kafka vs RocketMQ

面试必问:

面试官:"常见的消息队列有哪些?各自的特点和适用场景是什么?"

三大消息队列对比:

开发语言

Erlang

Scala/Java

Java

协议支持

AMQP, STOMP, MQTT

自定义协议

自定义协议

消息顺序

支持

分区内有序

支持

消息可靠性

性能

万级QPS

十万级QPS

十万级QPS

延时

微秒级

毫秒级

毫秒级

可用性

高(镜像队列)

高(副本机制)

高(主从架构)

消息回溯

不支持

支持

支持

消息过滤

不支持

不支持

支持

事务消息

支持

支持

支持

定时消息

插件支持

不支持

支持

使用场景分析:

// RabbitMQ适用场景
/*
1. 业务逻辑复杂,需要多种消息模式
2. 对消息可靠性要求极高
3. 需要复杂的路由规则
4. 系统规模中等,QPS要求不是特别高
*/

// Kafka适用场景  
/*
1. 大数据处理,日志收集
2. 高吞吐量场景
3. 流式数据处理
4. 需要消息回溯功能
*/

// RocketMQ适用场景
/*
1. 电商、金融等对可靠性要求高的场景
2. 需要事务消息
3. 需要定时消息、延时消息
4. 需要消息过滤功能
*/

RabbitMQ核心概念

AMQP模型:

@Configuration
@EnableRabbit
public class RabbitMQConfig {
    
    // 1. 直连交换机(Direct Exchange)
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct.exchange", true, false);
    }
    
    // 2. 主题交换机(Topic Exchange)
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic.exchange", true, false);
    }
    
    // 3. 扇形交换机(Fanout Exchange)
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange", true, false);
    }
    
    // 4. 头部交换机(Headers Exchange)
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers.exchange", true, false);
    }
    
    // 队列定义
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            .withArgument("x-message-ttl", 60000)  // 消息TTL
            .withArgument("x-max-length", 1000)    // 队列最大长度
            .build();
    }
    
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("order.dlq").build();
    }
    
    // 绑定关系
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(directExchange())
            .with("order.create");
    }
    
    // 死信队列绑定
    @Bean
    public Queue orderQueueWithDLX() {
        return QueueBuilder.durable("order.queue.dlx")
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .withArgument("x-dead-letter-routing-key", "order.dead")
            .build();
    }
}

消息生产者:

@Service
public class OrderMessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 发送普通消息
    public void sendOrderMessage(Order order) {
        rabbitTemplate.convertAndSend("direct.exchange", "order.create", order);
    }
    
    // 发送延时消息
    public void sendDelayMessage(Order order, int delaySeconds) {
        rabbitTemplate.convertAndSend("direct.exchange", "order.delay", order, message -> {
            message.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));
            return message;
        });
    }
    
    // 发送事务消息
    @Transactional
    public void sendTransactionalMessage(Order order) {
        // 本地事务操作
        orderService.saveOrder(order);
        
        // 发送消息(在同一事务中)
        rabbitTemplate.convertAndSend("direct.exchange", "order.create", order);
    }
    
    // 发送确认消息
    public void sendConfirmMessage(Order order) {
        // 设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息发送成功: {}", correlationData);
            } else {
                log.error("消息发送失败: {}, 原因: {}", correlationData, cause);
                // 重试或记录失败日志
            }
        });
        
        // 设置返回回调
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("消息被退回: {}", returned);
            // 处理被退回的消息
        });
        
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("direct.exchange", "order.create", order, correlationData);
    }
}

消息消费者:

@Component
public class OrderMessageConsumer {
    
    @Autowired
    private OrderService orderService;
    
    // 基本消费
    @RabbitListener(queues = "order.queue")
    public void handleOrderMessage(Order order, Channel channel, 
                                  @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            log.info("收到订单消息: {}", order);
            orderService.processOrder(order);
            
            // 手动确认
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("处理订单消息失败", e);
            try {
                // 拒绝消息,重新入队
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ioException) {
                log.error("拒绝消息失败", ioException);
            }
        }
    }
    
    // 批量消费
    @RabbitListener(queues = "order.batch.queue", 
                   containerFactory = "batchRabbitListenerContainerFactory")
    public void handleBatchOrderMessages(List<Order> orders) {
        log.info("批量处理订单消息,数量: {}", orders.size());
        orderService.batchProcessOrders(orders);
    }
    
    // 死信队列消费
    @RabbitListener(queues = "order.dlq")
    public void handleDeadLetterMessage(Order order, 
                                       @Header Map<String, Object> headers) {
        log.warn("处理死信消息: {}, headers: {}", order, headers);
        
        // 记录失败原因
        String reason = (String) headers.get("x-first-death-reason");
        orderService.recordFailedOrder(order, reason);
        
        // 可以选择重试或人工处理
    }
}

Kafka核心概念

Kafka架构:

@Configuration
@EnableKafka
public class KafkaConfig {
    
    // 生产者配置
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // 可靠性配置
        props.put(ProducerConfig.ACKS_CONFIG, "all");  // 等待所有副本确认
        props.put(ProducerConfig.RETRIES_CONFIG, 3);   // 重试次数
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
        
        // 性能配置
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  // 批次大小
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);      // 等待时间
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
        
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    // 消费者配置
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        
        // 消费配置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);     // 每次拉取记录数
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // 并发配置
        factory.setConcurrency(3); // 3个消费者线程
        
        // 手动确认模式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        
        return factory;
    }
}

Kafka生产者:

@Service
public class KafkaOrderProducer {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    // 发送消息
    public void sendOrderEvent(String topic, Order order) {
        kafkaTemplate.send(topic, order.getId().toString(), order)
            .addCallback(
                result -> log.info("消息发送成功: {}", result),
                failure -> log.error("消息发送失败", failure)
            );
    }
    
    // 发送事务消息
    @Transactional
    public void sendTransactionalOrderEvent(Order order) {
        kafkaTemplate.executeInTransaction(operations -> {
            // 发送多个相关消息
            operations.send("order-created", order.getId().toString(), order);
            operations.send("inventory-update", order.getProductId().toString(), 
                new InventoryUpdateEvent(order.getProductId(), -order.getQuantity()));
            operations.send("payment-request", order.getId().toString(), 
                new PaymentRequestEvent(order.getId(), order.getAmount()));
            
            return true;
        });
    }
    
    // 发送分区消息
    public void sendPartitionedMessage(Order order) {
        // 根据用户ID分区,保证同一用户的消息有序
        int partition = Math.abs(order.getUserId().hashCode()) % 3;
        kafkaTemplate.send("order-events", partition, order.getId().toString(), order);
    }
}

Kafka消费者:

@Component
public class KafkaOrderConsumer {
    
    @Autowired
    private OrderService orderService;
    
    // 基本消费
    @KafkaListener(topics = "order-created", groupId = "order-service")
    public void handleOrderCreated(ConsumerRecord<String, Order> record, Acknowledgment ack) {
        try {
            Order order = record.value();
            log.info("处理订单创建事件: partition={}, offset={}, order={}", 
                record.partition(), record.offset(), order);
            
            orderService.handleOrderCreated(order);
            
            // 手动确认
            ack.acknowledge();
            
        } catch (Exception e) {
            log.error("处理订单创建事件失败", e);
            // 不确认,消息会重新消费
        }
    }
    
    // 批量消费
    @KafkaListener(topics = "order-events", 
                   containerFactory = "batchKafkaListenerContainerFactory")
    public void handleOrderEventsBatch(List<ConsumerRecord<String, Order>> records, 
                                      Acknowledgment ack) {
        log.info("批量处理订单事件,数量: {}", records.size());
        
        List<Order> orders = records.stream()
            .map(ConsumerRecord::value)
            .collect(Collectors.toList());
        
        orderService.batchProcessOrders(orders);
        ack.acknowledge();
    }
    
    // 错误处理
    @KafkaListener(topics = "order-created")
    public void handleOrderCreatedWithRetry(Order order, 
                                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                           @Header(KafkaHeaders.OFFSET) long offset) {
        try {
            orderService.handleOrderCreated(order);
        } catch (RetryableException e) {
            log.warn("处理失败,将重试: topic={}, partition={}, offset={}", topic, partition, offset);
            throw e; // 抛出异常,触发重试
        } catch (Exception e) {
            log.error("处理失败,发送到死信队列: topic={}, partition={}, offset={}", topic, partition, offset);
            sendToDeadLetterTopic(order, e);
        }
    }
    
    private void sendToDeadLetterTopic(Order order, Exception e) {
        // 发送到死信主题
        kafkaTemplate.send("order-created-dlt", order.getId().toString(), 
            new DeadLetterMessage(order, e.getMessage()));
    }
}

🔄 9.2 分布式事务解决方案

2PC、3PC、TCC、Saga模式

面试重点:

面试官:"分布式事务有哪些解决方案?各自的优缺点是什么?"

分布式事务对比:

2PC

强一致

小规模系统

3PC

强一致

改进2PC

TCC

最终一致

金融支付

Saga

最终一致

长流程业务

消息事务

最终一致

异步场景

TCC模式实现

TCC框架实现:

// TCC接口定义
public interface TccTransaction {
    
    /**
     * Try阶段:尝试执行业务,预留资源
     */
    boolean tryExecute(TccContext context);
    
    /**
     * Confirm阶段:确认执行业务,提交资源
     */
    boolean confirmExecute(TccContext context);
    
    /**
     * Cancel阶段:取消执行业务,释放资源
     */
    boolean cancelExecute(TccContext context);
}

// 账户服务TCC实现
@Service
public class AccountTccService implements TccTransaction {
    
    @Autowired
    private AccountMapper accountMapper;
    
    @Autowired
    private AccountFreezeMapper freezeMapper;
    
    @Override
    public boolean tryExecute(TccContext context) {
        TransferRequest request = context.getRequest(TransferRequest.class);
        
        try {
            // 1. 检查账户余额
            Account account = accountMapper.selectById(request.getFromAccountId());
            if (account.getBalance().compareTo(request.getAmount()) < 0) {
                return false; // 余额不足
            }
            
            // 2. 冻结资金
            AccountFreeze freeze = new AccountFreeze();
            freeze.setTransactionId(context.getTransactionId());
            freeze.setAccountId(request.getFromAccountId());
            freeze.setAmount(request.getAmount());
            freeze.setStatus(FreezeStatus.FROZEN);
            freeze.setCreateTime(LocalDateTime.now());
            
            freezeMapper.insert(freeze);
            
            // 3. 更新账户余额(预扣)
            account.setBalance(account.getBalance().subtract(request.getAmount()));
            account.setFrozenAmount(account.getFrozenAmount().add(request.getAmount()));
            accountMapper.updateById(account);
            
            log.info("TCC Try阶段成功: transactionId={}, amount={}", 
                context.getTransactionId(), request.getAmount());
            return true;
            
        } catch (Exception e) {
            log.error("TCC Try阶段失败", e);
            return false;
        }
    }
    
    @Override
    public boolean confirmExecute(TccContext context) {
        String transactionId = context.getTransactionId();
        
        try {
            // 1. 查询冻结记录
            AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId);
            if (freeze == null || freeze.getStatus() == FreezeStatus.CONFIRMED) {
                return true; // 幂等处理
            }
            
            // 2. 确认扣款,清除冻结金额
            Account account = accountMapper.selectById(freeze.getAccountId());
            account.setFrozenAmount(account.getFrozenAmount().subtract(freeze.getAmount()));
            accountMapper.updateById(account);
            
            // 3. 更新冻结记录状态
            freeze.setStatus(FreezeStatus.CONFIRMED);
            freeze.setUpdateTime(LocalDateTime.now());
            freezeMapper.updateById(freeze);
            
            log.info("TCC Confirm阶段成功: transactionId={}", transactionId);
            return true;
            
        } catch (Exception e) {
            log.error("TCC Confirm阶段失败", e);
            return false;
        }
    }
    
    @Override
    public boolean cancelExecute(TccContext context) {
        String transactionId = context.getTransactionId();
        
        try {
            // 1. 查询冻结记录
            AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId);
            if (freeze == null || freeze.getStatus() == FreezeStatus.CANCELLED) {
                return true; // 幂等处理
            }
            
            // 2. 恢复账户余额
            Account account = accountMapper.selectById(freeze.getAccountId());
            account.setBalance(account.getBalance().add(freeze.getAmount()));
            account.setFrozenAmount(account.getFrozenAmount().subtract(freeze.getAmount()));
            accountMapper.updateById(account);
            
            // 3. 更新冻结记录状态
            freeze.setStatus(FreezeStatus.CANCELLED);
            freeze.setUpdateTime(LocalDateTime.now());
            freezeMapper.updateById(freeze);
            
            log.info("TCC Cancel阶段成功: transactionId={}", transactionId);
            return true;
            
        } catch (Exception e) {
            log.error("TCC Cancel阶段失败", e);
            return false;
        }
    }
}

// TCC事务管理器
@Service
public class TccTransactionManager {
    
    private final Map<String, List<TccTransaction>> transactionMap = new ConcurrentHashMap<>();
    
    @Autowired
    private TccLogMapper tccLogMapper;
    
    public String beginTransaction() {
        String transactionId = UUID.randomUUID().toString();
        transactionMap.put(transactionId, new ArrayList<>());
        
        // 记录事务日志
        TccLog log = new TccLog();
        log.setTransactionId(transactionId);
        log.setStatus(TccStatus.TRYING);
        log.setCreateTime(LocalDateTime.now());
        tccLogMapper.insert(log);
        
        return transactionId;
    }
    
    public void enlistResource(String transactionId, TccTransaction transaction) {
        List<TccTransaction> transactions = transactionMap.get(transactionId);
        if (transactions != null) {
            transactions.add(transaction);
        }
    }
    
    public boolean commit(String transactionId) {
        List<TccTransaction> transactions = transactionMap.get(transactionId);
        if (transactions == null) {
            return false;
        }
        
        // 执行所有Try阶段
        TccContext context = new TccContext(transactionId);
        for (TccTransaction transaction : transactions) {
            if (!transaction.tryExecute(context)) {
                // Try失败,执行Cancel
                rollback(transactionId);
                return false;
            }
        }
        
        // 执行所有Confirm阶段
        boolean allConfirmed = true;
        for (TccTransaction transaction : transactions) {
            if (!transaction.confirmExecute(context)) {
                allConfirmed = false;
                break;
            }
        }
        
        if (allConfirmed) {
            updateTransactionStatus(transactionId, TccStatus.CONFIRMED);
            transactionMap.remove(transactionId);
            return true;
        } else {
            // Confirm失败,需要补偿
            scheduleCompensation(transactionId);
            return false;
        }
    }
    
    public void rollback(String transactionId) {
        List<TccTransaction> transactions = transactionMap.get(transactionId);
        if (transactions == null) {
            return;
        }
        
        TccContext context = new TccContext(transactionId);
        for (TccTransaction transaction : transactions) {
            transaction.cancelExecute(context);
        }
        
        updateTransactionStatus(transactionId, TccStatus.CANCELLED);
        transactionMap.remove(transactionId);
    }
    
    private void scheduleCompensation(String transactionId) {
        // 安排补偿任务,定期重试Confirm或Cancel
        CompletableFuture.runAsync(() -> {
            int retryCount = 0;
            while (retryCount < 5) {
                try {
                    Thread.sleep(5000 * (retryCount + 1)); // 指数退避
                    if (commit(transactionId)) {
                        break;
                    }
                    retryCount++;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            
            if (retryCount >= 5) {
                // 人工介入
                updateTransactionStatus(transactionId, TccStatus.FAILED);
                sendAlert("TCC事务补偿失败,需要人工处理: " + transactionId);
            }
        });
    }
}

Saga模式实现

Saga编排器:

// Saga步骤定义
public interface SagaStep {
    String getName();
    boolean execute(SagaContext context);
    boolean compensate(SagaContext context);
}

// 订单Saga步骤
@Component
public class CreateOrderStep implements SagaStep {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public String getName() {
        return "CreateOrder";
    }
    
    @Override
    public boolean execute(SagaContext context) {
        OrderRequest request = context.getRequest(OrderRequest.class);
        try {
            Order order = orderService.createOrder(request);
            context.setData("orderId", order.getId());
            return true;
        } catch (Exception e) {
            log.error("创建订单失败", e);
            return false;
        }
    }
    
    @Override
    public boolean compensate(SagaContext context) {
        Long orderId = context.getData("orderId", Long.class);
        if (orderId != null) {
            try {
                orderService.cancelOrder(orderId);
                return true;
            } catch (Exception e) {
                log.error("取消订单失败", e);
                return false;
            }
        }
        return true;
    }
}

@Component
public class ReserveInventoryStep implements SagaStep {
    
    @Autowired
    private InventoryService inventoryService;
    
    @Override
    public String getName() {
        return "ReserveInventory";
    }
    
    @Override
    public boolean execute(SagaContext context) {
        OrderRequest request = context.getRequest(OrderRequest.class);
        try {
            String reservationId = inventoryService.reserveInventory(
                request.getProductId(), request.getQuantity());
            context.setData("reservationId", reservationId);
            return true;
        } catch (Exception e) {
            log.error("预留库存失败", e);
            return false;
        }
    }
    
    @Override
    public boolean compensate(SagaContext context) {
        String reservationId = context.getData("reservationId", String.class);
        if (reservationId != null) {
            try {
                inventoryService.releaseReservation(reservationId);
                return true;
            } catch (Exception e) {
                log.error("释放库存预留失败", e);
                return false;
            }
        }
        return true;
    }
}

// Saga编排器
@Service
public class SagaOrchestrator {
    
    @Autowired
    private List<SagaStep> sagaSteps;
    
    @Autowired
    private SagaLogMapper sagaLogMapper;
    
    public boolean executeSaga(String sagaId, Object request) {
        SagaContext context = new SagaContext(sagaId, request);
        List<SagaStep> executedSteps = new ArrayList<>();
        
        try {
            // 记录Saga开始
            recordSagaLog(sagaId, SagaStatus.STARTED, null);
            
            // 顺序执行所有步骤
            for (SagaStep step : sagaSteps) {
                recordSagaLog(sagaId, SagaStatus.EXECUTING, step.getName());
                
                if (step.execute(context)) {
                    executedSteps.add(step);
                    recordSagaLog(sagaId, SagaStatus.STEP_COMPLETED, step.getName());
                } else {
                    // 步骤失败,执行补偿
                    recordSagaLog(sagaId, SagaStatus.STEP_FAILED, step.getName());
                    compensate(sagaId, executedSteps, context);
                    return false;
                }
            }
            
            // 所有步骤成功
            recordSagaLog(sagaId, SagaStatus.COMPLETED, null);
            return true;
            
        } catch (Exception e) {
            log.error("Saga执行异常: sagaId={}", sagaId, e);
            recordSagaLog(sagaId, SagaStatus.FAILED, e.getMessage());
            compensate(sagaId, executedSteps, context);
            return false;
        }
    }
    
    private void compensate(String sagaId, List<SagaStep> executedSteps, SagaContext context) {
        recordSagaLog(sagaId, SagaStatus.COMPENSATING, null);
        
        // 逆序执行补偿
        Collections.reverse(executedSteps);
        for (SagaStep step : executedSteps) {
            try {
                recordSagaLog(sagaId, SagaStatus.COMPENSATING, step.getName());
                step.compensate(context);
                recordSagaLog(sagaId, SagaStatus.COMPENSATED, step.getName());
            } catch (Exception e) {
                log.error("补偿步骤失败: sagaId={}, step={}", sagaId, step.getName(), e);
                recordSagaLog(sagaId, SagaStatus.COMPENSATION_FAILED, step.getName());
                // 继续补偿其他步骤
            }
        }
        
        recordSagaLog(sagaId, SagaStatus.COMPENSATED, null);
    }
    
    private void recordSagaLog(String sagaId, SagaStatus status, String stepName) {
        SagaLog log = new SagaLog();
        log.setSagaId(sagaId);
        log.setStatus(status);
        log.setStepName(stepName);
        log.setCreateTime(LocalDateTime.now());
        sagaLogMapper.insert(log);
    }
}

🏗️ 9.3 微服务架构设计

Spring Cloud Alibaba技术栈

面试重点:

面试官:"微服务架构中有哪些核心组件?Spring Cloud Alibaba和Netflix有什么区别?"

技术栈对比:

服务注册发现

Eureka

Nacos

配置中心

Config Server

Nacos Config

服务网关

Zuul/Gateway

Gateway

负载均衡

Ribbon

LoadBalancer

熔断器

Hystrix

Sentinel

分布式事务

-

Seata

消息队列

-

RocketMQ

Nacos服务注册发现:

// 服务提供者配置
@SpringBootApplication
@EnableDiscoveryClient
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}

// application.yml
spring:
  application:
    name: order-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
        namespace: dev
        group: DEFAULT_GROUP
        metadata:
          version: 1.0.0
          region: beijing

// 服务消费者
@RestController
public class UserController {
    
    @Autowired
    private OrderServiceClient orderServiceClient;
    
    @GetMapping("/users/{userId}/orders")
    public List<Order> getUserOrders(@PathVariable Long userId) {
        return orderServiceClient.getOrdersByUserId(userId);
    }
}

// Feign客户端
@FeignClient(name = "order-service", fallback = OrderServiceFallback.class)
public interface OrderServiceClient {
    
    @GetMapping("/orders/user/{userId}")
    List<Order> getOrdersByUserId(@PathVariable("userId") Long userId);
}

// 降级处理
@Component
public class OrderServiceFallback implements OrderServiceClient {
    
    @Override
    public List<Order> getOrdersByUserId(Long userId) {
        log.warn("订单服务调用失败,返回空列表: userId={}", userId);
        return Collections.emptyList();
    }
}

服务网关设计

Spring Cloud Gateway配置:

@Configuration
public class GatewayConfig {
    
    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()
            // 用户服务路由
            .route("user-service", r -> r
                .path("/api/users/**")
                .filters(f -> f
                    .stripPrefix(1)  // 去掉/api前缀
                    .addRequestHeader("X-Gateway", "spring-cloud-gateway")
                    .circuitBreaker(config -> config
                        .setName("user-service-cb")
                        .setFallbackUri("forward:/fallback/user"))
                    .retry(config -> config
                        .setRetries(3)
                        .setStatuses(HttpStatus.INTERNAL_SERVER_ERROR)))
                .uri("lb://user-service"))
            
            // 订单服务路由
            .route("order-service", r -> r
                .path("/api/orders/**")
                .filters(f -> f
                    .stripPrefix(1)
                    .rateLimit(config -> config
                        .setRateLimiter(redisRateLimiter())
                        .setKeyResolver(userKeyResolver())))
                .uri("lb://order-service"))
            
            .build();
    }
    
    @Bean
    public RedisRateLimiter redisRateLimiter() {
        return new RedisRateLimiter(10, 20); // 每秒10个请求,突发20个
    }
    
    @Bean
    public KeyResolver userKeyResolver() {
        return exchange -> exchange.getRequest().getHeaders()
            .getFirst("X-User-Id");
    }
}

// 全局过滤器
@Component
public class AuthGlobalFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private JwtTokenUtil jwtTokenUtil;
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getURI().getPath();
        
        // 白名单路径
        if (isWhiteListPath(path)) {
            return chain.filter(exchange);
        }
        
        // 获取token
        String token = request.getHeaders().getFirst("Authorization");
        if (token == null || !token.startsWith("Bearer ")) {
            return unauthorized(exchange);
        }
        
        token = token.substring(7);
        
        // 验证token
        try {
            Claims claims = jwtTokenUtil.parseToken(token);
            String userId = claims.getSubject();
            
            // 添加用户信息到请求头
            ServerHttpRequest mutatedRequest = request.mutate()
                .header("X-User-Id", userId)
                .header("X-User-Name", claims.get("username", String.class))
                .build();
            
            return chain.filter(exchange.mutate().request(mutatedRequest).build());
            
        } catch (Exception e) {
            log.error("Token验证失败", e);
            return unauthorized(exchange);
        }
    }
    
    private boolean isWhiteListPath(String path) {
        return path.startsWith("/api/auth/") || 
               path.startsWith("/api/public/") ||
               path.equals("/health");
    }
    
    private Mono<Void> unauthorized(ServerWebExchange exchange) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.UNAUTHORIZED);
        response.getHeaders().add("Content-Type", "application/json");
        
        String body = "{\"code\":401,\"message\":\"Unauthorized\"}";
        DataBuffer buffer = response.bufferFactory().wrap(body.getBytes());
        return response.writeWith(Mono.just(buffer));
    }
    
    @Override
    public int getOrder() {
        return -100; // 优先级高
    }
}

💡 字节真题:高并发消息队列设计

面试场景:

面试官:"设计一个支持百万QPS的消息队列系统,需要考虑哪些问题?如何保证消息不丢失?"

设计方案:

// 高性能消息队列设计
@Component
public class HighPerformanceMessageQueue {
    
    // 使用Disruptor实现高性能队列
    private final RingBuffer<MessageEvent> ringBuffer;
    private final Disruptor<MessageEvent> disruptor;
    private final ExecutorService executor;
    
    // 消息持久化
    @Autowired
    private MessagePersistenceService persistenceService;
    
    // 消息复制
    @Autowired
    private MessageReplicationService replicationService;
    
    public HighPerformanceMessageQueue() {
        this.executor = Executors.newCachedThreadPool();
        
        // 创建Disruptor
        this.disruptor = new Disruptor<>(
            MessageEvent::new,
            1024 * 1024, // 环形缓冲区大小
            executor,
            ProducerType.MULTI, // 多生产者
            new YieldingWaitStrategy() // 等待策略
        );
        
        // 设置消息处理器
        disruptor.handleEventsWith(new MessageEventHandler());
        
        this.ringBuffer = disruptor.getRingBuffer();
        disruptor.start();
    }
    
    // 发送消息
    public boolean sendMessage(String topic, String key, byte[] payload) {
        try {
            long sequence = ringBuffer.next();
            try {
                MessageEvent event = ringBuffer.get(sequence);
                event.setTopic(topic);
                event.setKey(key);
                event.setPayload(payload);
                event.setTimestamp(System.currentTimeMillis());
                event.setMessageId(generateMessageId());
                
                return true;
            } finally {
                ringBuffer.publish(sequence);
            }
        } catch (Exception e) {
            log.error("发送消息失败", e);
            return false;
        }
    }
    
    // 消息事件处理器
    private class MessageEventHandler implements EventHandler<MessageEvent> {
        
        @Override
        public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) {
            try {
                // 1. 消息持久化(异步)
                CompletableFuture.runAsync(() -> {
                    persistenceService.persistMessage(event);
                });
                
                // 2. 消息复制(异步)
                CompletableFuture.runAsync(() -> {
                    replicationService.replicateMessage(event);
                });
                
                // 3. 投递给消费者
                deliverToConsumers(event);
                
            } catch (Exception e) {
                log.error("处理消息事件失败", e);
                // 错误处理:重试或发送到死信队列
                handleMessageError(event, e);
            }
        }
    }
    
    // 消息投递
    private void deliverToConsumers(MessageEvent event) {
        List<Consumer> consumers = getConsumersByTopic(event.getTopic());
        
        for (Consumer consumer : consumers) {
            try {
                // 负载均衡选择消费者实例
                ConsumerInstance instance = selectConsumerInstance(consumer, event.getKey());
                
                // 异步投递
                CompletableFuture.runAsync(() -> {
                    deliverToConsumerInstance(instance, event);
                });
                
            } catch (Exception e) {
                log.error("投递消息失败: consumer={}, messageId={}", 
                    consumer.getId(), event.getMessageId(), e);
            }
        }
    }
    
    // 消息持久化服务
    @Service
    public static class MessagePersistenceService {
        
        // 使用批量写入提高性能
        private final BlockingQueue<MessageEvent> persistenceQueue = 
            new ArrayBlockingQueue<>(10000);
        
        @PostConstruct
        public void startBatchPersistence() {
            // 批量持久化线程
            CompletableFuture.runAsync(() -> {
                List<MessageEvent> batch = new ArrayList<>();
                
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        // 收集批量消息
                        MessageEvent event = persistenceQueue.poll(100, TimeUnit.MILLISECONDS);
                        if (event != null) {
                            batch.add(event);
                        }
                        
                        // 批量大小达到阈值或超时,执行持久化
                        if (batch.size() >= 100 || 
                            (!batch.isEmpty() && System.currentTimeMillis() - batch.get(0).getTimestamp() > 1000)) {
                            
                            batchPersistMessages(batch);
                            batch.clear();
                        }
                        
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    } catch (Exception e) {
                        log.error("批量持久化失败", e);
                    }
                }
            });
        }
        
        public void persistMessage(MessageEvent event) {
            try {
                persistenceQueue.offer(event, 100, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("消息持久化队列已满", e);
            }
        }
        
        private void batchPersistMessages(List<MessageEvent> messages) {
            // 批量写入数据库或文件系统
            try {
                messageMapper.batchInsert(messages);
                log.debug("批量持久化消息成功,数量: {}", messages.size());
            } catch (Exception e) {
                log.error("批量持久化消息失败", e);
                // 重试或报警
            }
        }
    }
    
    // 消息复制服务(保证高可用)
    @Service
    public static class MessageReplicationService {
        
        @Value("${mq.replication.factor:3}")
        private int replicationFactor;
        
        @Autowired
        private List<ReplicaNode> replicaNodes;
        
        public void replicateMessage(MessageEvent event) {
            // 选择副本节点
            List<ReplicaNode> selectedNodes = selectReplicaNodes(replicationFactor - 1);
            
            // 并行复制到多个节点
            List<CompletableFuture<Boolean>> futures = selectedNodes.stream()
                .map(node -> CompletableFuture.supplyAsync(() -> {
                    try {
                        return node.replicateMessage(event);
                    } catch (Exception e) {
                        log.error("复制消息到节点失败: node={}, messageId={}", 
                            node.getId(), event.getMessageId(), e);
                        return false;
                    }
                }))
                .collect(Collectors.toList());
            
            // 等待大多数节点复制成功
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> {
                    long successCount = futures.stream()
                        .mapToLong(f -> f.join() ? 1 : 0)
                        .sum();
                    
                    if (successCount >= replicationFactor / 2) {
                        log.debug("消息复制成功: messageId={}, successCount={}", 
                            event.getMessageId(), successCount);
                        return true;
                    } else {
                        log.error("消息复制失败,成功节点数不足: messageId={}, successCount={}", 
                            event.getMessageId(), successCount);
                        return false;
                    }
                });
        }
        
        private List<ReplicaNode> selectReplicaNodes(int count) {
            // 选择健康的副本节点
            return replicaNodes.stream()
                .filter(ReplicaNode::isHealthy)
                .limit(count)
                .collect(Collectors.toList());
        }
    }
}

总结

消息队列和分布式系统是现代后端架构的核心技术,掌握这些知识对于Java开发者至关重要。面试中这部分内容的考察重点:

核心要点回顾:

  1. 消息队列:选型对比、可靠性保证、性能优化
  2. 分布式事务:2PC/TCC/Saga等方案的优缺点和适用场景
  3. 微服务架构:服务注册发现、配置中心、服务网关
  4. 系统设计:高并发、高可用、一致性保证

面试建议:

  • 理解各种技术方案的trade-off,能够根据业务场景选择合适的方案
  • 掌握分布式系统的核心理论(CAP、BASE等)
  • 具备实际项目经验,能够分享遇到的问题和解决方案
  • 关注技术发展趋势,如云原生、Service Mesh等

本章核心要点:

  • ✅ 消息队列选型和使用场景分析
  • ✅ 分布式事务解决方案对比
  • ✅ 微服务架构核心组件
  • ✅ 高并发消息队列设计
  • ✅ 服务治理和监控体系

下一章预告: 高并发系统设计 - 负载均衡、限流降级、缓存架构等系统架构设计

#java面试##秋招笔面试记录##面试##java速通##秋招投递攻略#
Java面试圣经 文章被收录于专栏

Java面试圣经

全部评论

相关推荐

评论
1
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务