Java秋招面试之消息队列与分布式话题
第9章 消息队列与分布式
面试重要程度:⭐⭐⭐⭐
常见提问方式:消息可靠性保证、分布式事务、服务治理
预计阅读时间:40分钟
开场白
兄弟,消息队列和分布式系统是现代后端架构的核心!随着微服务架构的普及,这块知识在面试中的重要性越来越高。不管是RabbitMQ、Kafka,还是分布式事务、服务治理,都是面试官爱考的重点。
今天我们就把这些分布式核心技术搞透,让你在面试中展现出对大型系统架构的深度理解。
📨 9.1 消息队列选型对比
RabbitMQ vs Kafka vs RocketMQ
面试必问:
面试官:"常见的消息队列有哪些?各自的特点和适用场景是什么?"
三大消息队列对比:
开发语言 | Erlang | Scala/Java | Java |
协议支持 | AMQP, STOMP, MQTT | 自定义协议 | 自定义协议 |
消息顺序 | 支持 | 分区内有序 | 支持 |
消息可靠性 | 高 | 高 | 高 |
性能 | 万级QPS | 十万级QPS | 十万级QPS |
延时 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(镜像队列) | 高(副本机制) | 高(主从架构) |
消息回溯 | 不支持 | 支持 | 支持 |
消息过滤 | 不支持 | 不支持 | 支持 |
事务消息 | 支持 | 支持 | 支持 |
定时消息 | 插件支持 | 不支持 | 支持 |
使用场景分析:
// RabbitMQ适用场景 /* 1. 业务逻辑复杂,需要多种消息模式 2. 对消息可靠性要求极高 3. 需要复杂的路由规则 4. 系统规模中等,QPS要求不是特别高 */ // Kafka适用场景 /* 1. 大数据处理,日志收集 2. 高吞吐量场景 3. 流式数据处理 4. 需要消息回溯功能 */ // RocketMQ适用场景 /* 1. 电商、金融等对可靠性要求高的场景 2. 需要事务消息 3. 需要定时消息、延时消息 4. 需要消息过滤功能 */
RabbitMQ核心概念
AMQP模型:
@Configuration @EnableRabbit public class RabbitMQConfig { // 1. 直连交换机(Direct Exchange) @Bean public DirectExchange directExchange() { return new DirectExchange("direct.exchange", true, false); } // 2. 主题交换机(Topic Exchange) @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.exchange", true, false); } // 3. 扇形交换机(Fanout Exchange) @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.exchange", true, false); } // 4. 头部交换机(Headers Exchange) @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.exchange", true, false); } // 队列定义 @Bean public Queue orderQueue() { return QueueBuilder.durable("order.queue") .withArgument("x-message-ttl", 60000) // 消息TTL .withArgument("x-max-length", 1000) // 队列最大长度 .build(); } @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("order.dlq").build(); } // 绑定关系 @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(directExchange()) .with("order.create"); } // 死信队列绑定 @Bean public Queue orderQueueWithDLX() { return QueueBuilder.durable("order.queue.dlx") .withArgument("x-dead-letter-exchange", "dlx.exchange") .withArgument("x-dead-letter-routing-key", "order.dead") .build(); } }
消息生产者:
@Service public class OrderMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; // 发送普通消息 public void sendOrderMessage(Order order) { rabbitTemplate.convertAndSend("direct.exchange", "order.create", order); } // 发送延时消息 public void sendDelayMessage(Order order, int delaySeconds) { rabbitTemplate.convertAndSend("direct.exchange", "order.delay", order, message -> { message.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000)); return message; }); } // 发送事务消息 @Transactional public void sendTransactionalMessage(Order order) { // 本地事务操作 orderService.saveOrder(order); // 发送消息(在同一事务中) rabbitTemplate.convertAndSend("direct.exchange", "order.create", order); } // 发送确认消息 public void sendConfirmMessage(Order order) { // 设置确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息发送成功: {}", correlationData); } else { log.error("消息发送失败: {}, 原因: {}", correlationData, cause); // 重试或记录失败日志 } }); // 设置返回回调 rabbitTemplate.setReturnsCallback(returned -> { log.error("消息被退回: {}", returned); // 处理被退回的消息 }); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("direct.exchange", "order.create", order, correlationData); } }
消息消费者:
@Component public class OrderMessageConsumer { @Autowired private OrderService orderService; // 基本消费 @RabbitListener(queues = "order.queue") public void handleOrderMessage(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { log.info("收到订单消息: {}", order); orderService.processOrder(order); // 手动确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("处理订单消息失败", e); try { // 拒绝消息,重新入队 channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("拒绝消息失败", ioException); } } } // 批量消费 @RabbitListener(queues = "order.batch.queue", containerFactory = "batchRabbitListenerContainerFactory") public void handleBatchOrderMessages(List<Order> orders) { log.info("批量处理订单消息,数量: {}", orders.size()); orderService.batchProcessOrders(orders); } // 死信队列消费 @RabbitListener(queues = "order.dlq") public void handleDeadLetterMessage(Order order, @Header Map<String, Object> headers) { log.warn("处理死信消息: {}, headers: {}", order, headers); // 记录失败原因 String reason = (String) headers.get("x-first-death-reason"); orderService.recordFailedOrder(order, reason); // 可以选择重试或人工处理 } }
Kafka核心概念
Kafka架构:
@Configuration @EnableKafka public class KafkaConfig { // 生产者配置 @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 可靠性配置 props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性 // 性能配置 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小 props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待时间 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小 return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } // 消费者配置 @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // 消费配置 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次拉取记录数 return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 并发配置 factory.setConcurrency(3); // 3个消费者线程 // 手动确认模式 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
Kafka生产者:
@Service public class KafkaOrderProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 public void sendOrderEvent(String topic, Order order) { kafkaTemplate.send(topic, order.getId().toString(), order) .addCallback( result -> log.info("消息发送成功: {}", result), failure -> log.error("消息发送失败", failure) ); } // 发送事务消息 @Transactional public void sendTransactionalOrderEvent(Order order) { kafkaTemplate.executeInTransaction(operations -> { // 发送多个相关消息 operations.send("order-created", order.getId().toString(), order); operations.send("inventory-update", order.getProductId().toString(), new InventoryUpdateEvent(order.getProductId(), -order.getQuantity())); operations.send("payment-request", order.getId().toString(), new PaymentRequestEvent(order.getId(), order.getAmount())); return true; }); } // 发送分区消息 public void sendPartitionedMessage(Order order) { // 根据用户ID分区,保证同一用户的消息有序 int partition = Math.abs(order.getUserId().hashCode()) % 3; kafkaTemplate.send("order-events", partition, order.getId().toString(), order); } }
Kafka消费者:
@Component public class KafkaOrderConsumer { @Autowired private OrderService orderService; // 基本消费 @KafkaListener(topics = "order-created", groupId = "order-service") public void handleOrderCreated(ConsumerRecord<String, Order> record, Acknowledgment ack) { try { Order order = record.value(); log.info("处理订单创建事件: partition={}, offset={}, order={}", record.partition(), record.offset(), order); orderService.handleOrderCreated(order); // 手动确认 ack.acknowledge(); } catch (Exception e) { log.error("处理订单创建事件失败", e); // 不确认,消息会重新消费 } } // 批量消费 @KafkaListener(topics = "order-events", containerFactory = "batchKafkaListenerContainerFactory") public void handleOrderEventsBatch(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) { log.info("批量处理订单事件,数量: {}", records.size()); List<Order> orders = records.stream() .map(ConsumerRecord::value) .collect(Collectors.toList()); orderService.batchProcessOrders(orders); ack.acknowledge(); } // 错误处理 @KafkaListener(topics = "order-created") public void handleOrderCreatedWithRetry(Order order, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { try { orderService.handleOrderCreated(order); } catch (RetryableException e) { log.warn("处理失败,将重试: topic={}, partition={}, offset={}", topic, partition, offset); throw e; // 抛出异常,触发重试 } catch (Exception e) { log.error("处理失败,发送到死信队列: topic={}, partition={}, offset={}", topic, partition, offset); sendToDeadLetterTopic(order, e); } } private void sendToDeadLetterTopic(Order order, Exception e) { // 发送到死信主题 kafkaTemplate.send("order-created-dlt", order.getId().toString(), new DeadLetterMessage(order, e.getMessage())); } }
🔄 9.2 分布式事务解决方案
2PC、3PC、TCC、Saga模式
面试重点:
面试官:"分布式事务有哪些解决方案?各自的优缺点是什么?"
分布式事务对比:
2PC | 强一致 | 低 | 低 | 低 | 小规模系统 |
3PC | 强一致 | 中 | 低 | 中 | 改进2PC |
TCC | 最终一致 | 高 | 中 | 高 | 金融支付 |
Saga | 最终一致 | 高 | 高 | 中 | 长流程业务 |
消息事务 | 最终一致 | 高 | 高 | 中 | 异步场景 |
TCC模式实现
TCC框架实现:
// TCC接口定义 public interface TccTransaction { /** * Try阶段:尝试执行业务,预留资源 */ boolean tryExecute(TccContext context); /** * Confirm阶段:确认执行业务,提交资源 */ boolean confirmExecute(TccContext context); /** * Cancel阶段:取消执行业务,释放资源 */ boolean cancelExecute(TccContext context); } // 账户服务TCC实现 @Service public class AccountTccService implements TccTransaction { @Autowired private AccountMapper accountMapper; @Autowired private AccountFreezeMapper freezeMapper; @Override public boolean tryExecute(TccContext context) { TransferRequest request = context.getRequest(TransferRequest.class); try { // 1. 检查账户余额 Account account = accountMapper.selectById(request.getFromAccountId()); if (account.getBalance().compareTo(request.getAmount()) < 0) { return false; // 余额不足 } // 2. 冻结资金 AccountFreeze freeze = new AccountFreeze(); freeze.setTransactionId(context.getTransactionId()); freeze.setAccountId(request.getFromAccountId()); freeze.setAmount(request.getAmount()); freeze.setStatus(FreezeStatus.FROZEN); freeze.setCreateTime(LocalDateTime.now()); freezeMapper.insert(freeze); // 3. 更新账户余额(预扣) account.setBalance(account.getBalance().subtract(request.getAmount())); account.setFrozenAmount(account.getFrozenAmount().add(request.getAmount())); accountMapper.updateById(account); log.info("TCC Try阶段成功: transactionId={}, amount={}", context.getTransactionId(), request.getAmount()); return true; } catch (Exception e) { log.error("TCC Try阶段失败", e); return false; } } @Override public boolean confirmExecute(TccContext context) { String transactionId = context.getTransactionId(); try { // 1. 查询冻结记录 AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId); if (freeze == null || freeze.getStatus() == FreezeStatus.CONFIRMED) { return true; // 幂等处理 } // 2. 确认扣款,清除冻结金额 Account account = accountMapper.selectById(freeze.getAccountId()); account.setFrozenAmount(account.getFrozenAmount().subtract(freeze.getAmount())); accountMapper.updateById(account); // 3. 更新冻结记录状态 freeze.setStatus(FreezeStatus.CONFIRMED); freeze.setUpdateTime(LocalDateTime.now()); freezeMapper.updateById(freeze); log.info("TCC Confirm阶段成功: transactionId={}", transactionId); return true; } catch (Exception e) { log.error("TCC Confirm阶段失败", e); return false; } } @Override public boolean cancelExecute(TccContext context) { String transactionId = context.getTransactionId(); try { // 1. 查询冻结记录 AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId); if (freeze == null || freeze.getStatus() == FreezeStatus.CANCELLED) { return true; // 幂等处理 } // 2. 恢复账户余额 Account account = accountMapper.selectById(freeze.getAccountId()); account.setBalance(account.getBalance().add(freeze.getAmount())); account.setFrozenAmount(account.getFrozenAmount().subtract(freeze.getAmount())); accountMapper.updateById(account); // 3. 更新冻结记录状态 freeze.setStatus(FreezeStatus.CANCELLED); freeze.setUpdateTime(LocalDateTime.now()); freezeMapper.updateById(freeze); log.info("TCC Cancel阶段成功: transactionId={}", transactionId); return true; } catch (Exception e) { log.error("TCC Cancel阶段失败", e); return false; } } } // TCC事务管理器 @Service public class TccTransactionManager { private final Map<String, List<TccTransaction>> transactionMap = new ConcurrentHashMap<>(); @Autowired private TccLogMapper tccLogMapper; public String beginTransaction() { String transactionId = UUID.randomUUID().toString(); transactionMap.put(transactionId, new ArrayList<>()); // 记录事务日志 TccLog log = new TccLog(); log.setTransactionId(transactionId); log.setStatus(TccStatus.TRYING); log.setCreateTime(LocalDateTime.now()); tccLogMapper.insert(log); return transactionId; } public void enlistResource(String transactionId, TccTransaction transaction) { List<TccTransaction> transactions = transactionMap.get(transactionId); if (transactions != null) { transactions.add(transaction); } } public boolean commit(String transactionId) { List<TccTransaction> transactions = transactionMap.get(transactionId); if (transactions == null) { return false; } // 执行所有Try阶段 TccContext context = new TccContext(transactionId); for (TccTransaction transaction : transactions) { if (!transaction.tryExecute(context)) { // Try失败,执行Cancel rollback(transactionId); return false; } } // 执行所有Confirm阶段 boolean allConfirmed = true; for (TccTransaction transaction : transactions) { if (!transaction.confirmExecute(context)) { allConfirmed = false; break; } } if (allConfirmed) { updateTransactionStatus(transactionId, TccStatus.CONFIRMED); transactionMap.remove(transactionId); return true; } else { // Confirm失败,需要补偿 scheduleCompensation(transactionId); return false; } } public void rollback(String transactionId) { List<TccTransaction> transactions = transactionMap.get(transactionId); if (transactions == null) { return; } TccContext context = new TccContext(transactionId); for (TccTransaction transaction : transactions) { transaction.cancelExecute(context); } updateTransactionStatus(transactionId, TccStatus.CANCELLED); transactionMap.remove(transactionId); } private void scheduleCompensation(String transactionId) { // 安排补偿任务,定期重试Confirm或Cancel CompletableFuture.runAsync(() -> { int retryCount = 0; while (retryCount < 5) { try { Thread.sleep(5000 * (retryCount + 1)); // 指数退避 if (commit(transactionId)) { break; } retryCount++; } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } if (retryCount >= 5) { // 人工介入 updateTransactionStatus(transactionId, TccStatus.FAILED); sendAlert("TCC事务补偿失败,需要人工处理: " + transactionId); } }); } }
Saga模式实现
Saga编排器:
// Saga步骤定义 public interface SagaStep { String getName(); boolean execute(SagaContext context); boolean compensate(SagaContext context); } // 订单Saga步骤 @Component public class CreateOrderStep implements SagaStep { @Autowired private OrderService orderService; @Override public String getName() { return "CreateOrder"; } @Override public boolean execute(SagaContext context) { OrderRequest request = context.getRequest(OrderRequest.class); try { Order order = orderService.createOrder(request); context.setData("orderId", order.getId()); return true; } catch (Exception e) { log.error("创建订单失败", e); return false; } } @Override public boolean compensate(SagaContext context) { Long orderId = context.getData("orderId", Long.class); if (orderId != null) { try { orderService.cancelOrder(orderId); return true; } catch (Exception e) { log.error("取消订单失败", e); return false; } } return true; } } @Component public class ReserveInventoryStep implements SagaStep { @Autowired private InventoryService inventoryService; @Override public String getName() { return "ReserveInventory"; } @Override public boolean execute(SagaContext context) { OrderRequest request = context.getRequest(OrderRequest.class); try { String reservationId = inventoryService.reserveInventory( request.getProductId(), request.getQuantity()); context.setData("reservationId", reservationId); return true; } catch (Exception e) { log.error("预留库存失败", e); return false; } } @Override public boolean compensate(SagaContext context) { String reservationId = context.getData("reservationId", String.class); if (reservationId != null) { try { inventoryService.releaseReservation(reservationId); return true; } catch (Exception e) { log.error("释放库存预留失败", e); return false; } } return true; } } // Saga编排器 @Service public class SagaOrchestrator { @Autowired private List<SagaStep> sagaSteps; @Autowired private SagaLogMapper sagaLogMapper; public boolean executeSaga(String sagaId, Object request) { SagaContext context = new SagaContext(sagaId, request); List<SagaStep> executedSteps = new ArrayList<>(); try { // 记录Saga开始 recordSagaLog(sagaId, SagaStatus.STARTED, null); // 顺序执行所有步骤 for (SagaStep step : sagaSteps) { recordSagaLog(sagaId, SagaStatus.EXECUTING, step.getName()); if (step.execute(context)) { executedSteps.add(step); recordSagaLog(sagaId, SagaStatus.STEP_COMPLETED, step.getName()); } else { // 步骤失败,执行补偿 recordSagaLog(sagaId, SagaStatus.STEP_FAILED, step.getName()); compensate(sagaId, executedSteps, context); return false; } } // 所有步骤成功 recordSagaLog(sagaId, SagaStatus.COMPLETED, null); return true; } catch (Exception e) { log.error("Saga执行异常: sagaId={}", sagaId, e); recordSagaLog(sagaId, SagaStatus.FAILED, e.getMessage()); compensate(sagaId, executedSteps, context); return false; } } private void compensate(String sagaId, List<SagaStep> executedSteps, SagaContext context) { recordSagaLog(sagaId, SagaStatus.COMPENSATING, null); // 逆序执行补偿 Collections.reverse(executedSteps); for (SagaStep step : executedSteps) { try { recordSagaLog(sagaId, SagaStatus.COMPENSATING, step.getName()); step.compensate(context); recordSagaLog(sagaId, SagaStatus.COMPENSATED, step.getName()); } catch (Exception e) { log.error("补偿步骤失败: sagaId={}, step={}", sagaId, step.getName(), e); recordSagaLog(sagaId, SagaStatus.COMPENSATION_FAILED, step.getName()); // 继续补偿其他步骤 } } recordSagaLog(sagaId, SagaStatus.COMPENSATED, null); } private void recordSagaLog(String sagaId, SagaStatus status, String stepName) { SagaLog log = new SagaLog(); log.setSagaId(sagaId); log.setStatus(status); log.setStepName(stepName); log.setCreateTime(LocalDateTime.now()); sagaLogMapper.insert(log); } }
🏗️ 9.3 微服务架构设计
Spring Cloud Alibaba技术栈
面试重点:
面试官:"微服务架构中有哪些核心组件?Spring Cloud Alibaba和Netflix有什么区别?"
技术栈对比:
服务注册发现 | Eureka | Nacos |
配置中心 | Config Server | Nacos Config |
服务网关 | Zuul/Gateway | Gateway |
负载均衡 | Ribbon | LoadBalancer |
熔断器 | Hystrix | Sentinel |
分布式事务 | - | Seata |
消息队列 | - | RocketMQ |
Nacos服务注册发现:
// 服务提供者配置 @SpringBootApplication @EnableDiscoveryClient public class OrderServiceApplication { public static void main(String[] args) { SpringApplication.run(OrderServiceApplication.class, args); } } // application.yml spring: application: name: order-service cloud: nacos: discovery: server-addr: localhost:8848 namespace: dev group: DEFAULT_GROUP metadata: version: 1.0.0 region: beijing // 服务消费者 @RestController public class UserController { @Autowired private OrderServiceClient orderServiceClient; @GetMapping("/users/{userId}/orders") public List<Order> getUserOrders(@PathVariable Long userId) { return orderServiceClient.getOrdersByUserId(userId); } } // Feign客户端 @FeignClient(name = "order-service", fallback = OrderServiceFallback.class) public interface OrderServiceClient { @GetMapping("/orders/user/{userId}") List<Order> getOrdersByUserId(@PathVariable("userId") Long userId); } // 降级处理 @Component public class OrderServiceFallback implements OrderServiceClient { @Override public List<Order> getOrdersByUserId(Long userId) { log.warn("订单服务调用失败,返回空列表: userId={}", userId); return Collections.emptyList(); } }
服务网关设计
Spring Cloud Gateway配置:
@Configuration public class GatewayConfig { @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { return builder.routes() // 用户服务路由 .route("user-service", r -> r .path("/api/users/**") .filters(f -> f .stripPrefix(1) // 去掉/api前缀 .addRequestHeader("X-Gateway", "spring-cloud-gateway") .circuitBreaker(config -> config .setName("user-service-cb") .setFallbackUri("forward:/fallback/user")) .retry(config -> config .setRetries(3) .setStatuses(HttpStatus.INTERNAL_SERVER_ERROR))) .uri("lb://user-service")) // 订单服务路由 .route("order-service", r -> r .path("/api/orders/**") .filters(f -> f .stripPrefix(1) .rateLimit(config -> config .setRateLimiter(redisRateLimiter()) .setKeyResolver(userKeyResolver()))) .uri("lb://order-service")) .build(); } @Bean public RedisRateLimiter redisRateLimiter() { return new RedisRateLimiter(10, 20); // 每秒10个请求,突发20个 } @Bean public KeyResolver userKeyResolver() { return exchange -> exchange.getRequest().getHeaders() .getFirst("X-User-Id"); } } // 全局过滤器 @Component public class AuthGlobalFilter implements GlobalFilter, Ordered { @Autowired private JwtTokenUtil jwtTokenUtil; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); String path = request.getURI().getPath(); // 白名单路径 if (isWhiteListPath(path)) { return chain.filter(exchange); } // 获取token String token = request.getHeaders().getFirst("Authorization"); if (token == null || !token.startsWith("Bearer ")) { return unauthorized(exchange); } token = token.substring(7); // 验证token try { Claims claims = jwtTokenUtil.parseToken(token); String userId = claims.getSubject(); // 添加用户信息到请求头 ServerHttpRequest mutatedRequest = request.mutate() .header("X-User-Id", userId) .header("X-User-Name", claims.get("username", String.class)) .build(); return chain.filter(exchange.mutate().request(mutatedRequest).build()); } catch (Exception e) { log.error("Token验证失败", e); return unauthorized(exchange); } } private boolean isWhiteListPath(String path) { return path.startsWith("/api/auth/") || path.startsWith("/api/public/") || path.equals("/health"); } private Mono<Void> unauthorized(ServerWebExchange exchange) { ServerHttpResponse response = exchange.getResponse(); response.setStatusCode(HttpStatus.UNAUTHORIZED); response.getHeaders().add("Content-Type", "application/json"); String body = "{\"code\":401,\"message\":\"Unauthorized\"}"; DataBuffer buffer = response.bufferFactory().wrap(body.getBytes()); return response.writeWith(Mono.just(buffer)); } @Override public int getOrder() { return -100; // 优先级高 } }
💡 字节真题:高并发消息队列设计
面试场景:
面试官:"设计一个支持百万QPS的消息队列系统,需要考虑哪些问题?如何保证消息不丢失?"
设计方案:
// 高性能消息队列设计 @Component public class HighPerformanceMessageQueue { // 使用Disruptor实现高性能队列 private final RingBuffer<MessageEvent> ringBuffer; private final Disruptor<MessageEvent> disruptor; private final ExecutorService executor; // 消息持久化 @Autowired private MessagePersistenceService persistenceService; // 消息复制 @Autowired private MessageReplicationService replicationService; public HighPerformanceMessageQueue() { this.executor = Executors.newCachedThreadPool(); // 创建Disruptor this.disruptor = new Disruptor<>( MessageEvent::new, 1024 * 1024, // 环形缓冲区大小 executor, ProducerType.MULTI, // 多生产者 new YieldingWaitStrategy() // 等待策略 ); // 设置消息处理器 disruptor.handleEventsWith(new MessageEventHandler()); this.ringBuffer = disruptor.getRingBuffer(); disruptor.start(); } // 发送消息 public boolean sendMessage(String topic, String key, byte[] payload) { try { long sequence = ringBuffer.next(); try { MessageEvent event = ringBuffer.get(sequence); event.setTopic(topic); event.setKey(key); event.setPayload(payload); event.setTimestamp(System.currentTimeMillis()); event.setMessageId(generateMessageId()); return true; } finally { ringBuffer.publish(sequence); } } catch (Exception e) { log.error("发送消息失败", e); return false; } } // 消息事件处理器 private class MessageEventHandler implements EventHandler<MessageEvent> { @Override public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) { try { // 1. 消息持久化(异步) CompletableFuture.runAsync(() -> { persistenceService.persistMessage(event); }); // 2. 消息复制(异步) CompletableFuture.runAsync(() -> { replicationService.replicateMessage(event); }); // 3. 投递给消费者 deliverToConsumers(event); } catch (Exception e) { log.error("处理消息事件失败", e); // 错误处理:重试或发送到死信队列 handleMessageError(event, e); } } } // 消息投递 private void deliverToConsumers(MessageEvent event) { List<Consumer> consumers = getConsumersByTopic(event.getTopic()); for (Consumer consumer : consumers) { try { // 负载均衡选择消费者实例 ConsumerInstance instance = selectConsumerInstance(consumer, event.getKey()); // 异步投递 CompletableFuture.runAsync(() -> { deliverToConsumerInstance(instance, event); }); } catch (Exception e) { log.error("投递消息失败: consumer={}, messageId={}", consumer.getId(), event.getMessageId(), e); } } } // 消息持久化服务 @Service public static class MessagePersistenceService { // 使用批量写入提高性能 private final BlockingQueue<MessageEvent> persistenceQueue = new ArrayBlockingQueue<>(10000); @PostConstruct public void startBatchPersistence() { // 批量持久化线程 CompletableFuture.runAsync(() -> { List<MessageEvent> batch = new ArrayList<>(); while (!Thread.currentThread().isInterrupted()) { try { // 收集批量消息 MessageEvent event = persistenceQueue.poll(100, TimeUnit.MILLISECONDS); if (event != null) { batch.add(event); } // 批量大小达到阈值或超时,执行持久化 if (batch.size() >= 100 || (!batch.isEmpty() && System.currentTimeMillis() - batch.get(0).getTimestamp() > 1000)) { batchPersistMessages(batch); batch.clear(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.error("批量持久化失败", e); } } }); } public void persistMessage(MessageEvent event) { try { persistenceQueue.offer(event, 100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("消息持久化队列已满", e); } } private void batchPersistMessages(List<MessageEvent> messages) { // 批量写入数据库或文件系统 try { messageMapper.batchInsert(messages); log.debug("批量持久化消息成功,数量: {}", messages.size()); } catch (Exception e) { log.error("批量持久化消息失败", e); // 重试或报警 } } } // 消息复制服务(保证高可用) @Service public static class MessageReplicationService { @Value("${mq.replication.factor:3}") private int replicationFactor; @Autowired private List<ReplicaNode> replicaNodes; public void replicateMessage(MessageEvent event) { // 选择副本节点 List<ReplicaNode> selectedNodes = selectReplicaNodes(replicationFactor - 1); // 并行复制到多个节点 List<CompletableFuture<Boolean>> futures = selectedNodes.stream() .map(node -> CompletableFuture.supplyAsync(() -> { try { return node.replicateMessage(event); } catch (Exception e) { log.error("复制消息到节点失败: node={}, messageId={}", node.getId(), event.getMessageId(), e); return false; } })) .collect(Collectors.toList()); // 等待大多数节点复制成功 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> { long successCount = futures.stream() .mapToLong(f -> f.join() ? 1 : 0) .sum(); if (successCount >= replicationFactor / 2) { log.debug("消息复制成功: messageId={}, successCount={}", event.getMessageId(), successCount); return true; } else { log.error("消息复制失败,成功节点数不足: messageId={}, successCount={}", event.getMessageId(), successCount); return false; } }); } private List<ReplicaNode> selectReplicaNodes(int count) { // 选择健康的副本节点 return replicaNodes.stream() .filter(ReplicaNode::isHealthy) .limit(count) .collect(Collectors.toList()); } } }
总结
消息队列和分布式系统是现代后端架构的核心技术,掌握这些知识对于Java开发者至关重要。面试中这部分内容的考察重点:
核心要点回顾:
- 消息队列:选型对比、可靠性保证、性能优化
- 分布式事务:2PC/TCC/Saga等方案的优缺点和适用场景
- 微服务架构:服务注册发现、配置中心、服务网关
- 系统设计:高并发、高可用、一致性保证
面试建议:
- 理解各种技术方案的trade-off,能够根据业务场景选择合适的方案
- 掌握分布式系统的核心理论(CAP、BASE等)
- 具备实际项目经验,能够分享遇到的问题和解决方案
- 关注技术发展趋势,如云原生、Service Mesh等
本章核心要点:
- ✅ 消息队列选型和使用场景分析
- ✅ 分布式事务解决方案对比
- ✅ 微服务架构核心组件
- ✅ 高并发消息队列设计
- ✅ 服务治理和监控体系
下一章预告: 高并发系统设计 - 负载均衡、限流降级、缓存架构等系统架构设计
#java面试##秋招笔面试记录##面试##java速通##秋招投递攻略#Java面试圣经