Spring Boot轻松玩转Kafka实战
Spring Boot 集成 Kafka 详解
Kafka 简介
Apache Kafka 是一个分布式流处理平台,具有高吞吐量、低延迟和可扩展性。常用于实时数据管道、消息队列和事件驱动架构。Spring Boot 通过 Spring Kafka 项目提供了与 Kafka 的无缝集成。
环境准备
确保已安装 JDK 8+、Maven 或 Gradle 以及 Kafka 环境。可通过以下命令启动 Kafka 本地服务:
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka
bin/kafka-server-start.sh config/server.properties
添加依赖
在 Spring Boot 项目中引入 Spring Kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置 Kafka
在 application.properties 或 application.yml 中配置 Kafka 连接信息:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
生产者实现
创建 Kafka 生产者发送消息:
@RestController
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
kafkaTemplate.send("test-topic", message);
return "Message sent: " + message;
}
}
消费者实现
通过 @KafkaListener 注解实现消息消费:
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
高级配置
自定义生产者/消费者工厂:
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
}
事务支持
启用 Kafka 事务需在配置中添加:
spring.kafka.producer.transaction-id-prefix=kafka-tx-
代码中使用事务:
@Transactional
public void sendWithTransaction(String message) {
kafkaTemplate.send("test-topic", message);
// 其他数据库操作
}
错误处理
自定义错误处理器:
@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
return (message, exception, consumer) -> {
System.err.println("Error occurred: " + exception.getMessage());
return null;
};
}
在 @KafkaListener 中指定错误处理器:
@KafkaListener(topics = "test-topic", errorHandler = "listenErrorHandler")
public void listenWithHandler(String message) {
// 消费逻辑
}
性能优化
调整生产者批处理参数提升吞吐量:
spring.kafka.producer.batch-size=16384
spring.kafka.producer.linger-ms=50
spring.kafka.producer.buffer-memory=33554432
监控与管理
集成 Micrometer 监控 Kafka 指标:
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
配置暴露指标端点:
management.endpoints.web.exposure.include=health,info,metrics
安全配置
启用 SASL/SSL 认证:
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pass";
测试验证
编写集成测试验证功能:
@SpringBootTest
@EmbeddedKafka(topics = {"test-topic"})
class KafkaIntegrationTest {
@Autowired
private KafkaTemplate<String, String> template;
@Test
void testSendAndReceive() throws Exception {
template.send("test-topic", "test-message");
// 验证消费逻辑
}
}
5G.okacbd001.asia/PoSt/1123_141528.HtM
5G.okacbd002.asia/PoSt/1123_456916.HtM
5G.okacbd003.asia/PoSt/1123_218398.HtM
5G.okacbd004.asia/PoSt/1123_889831.HtM
5G.okacbd005.asia/PoSt/1123_123491.HtM
5G.okacbd006.asia/PoSt/1123_714080.HtM
5G.okacbd007.asia/PoSt/1123_537060.HtM
5G.okacbd008.asia/PoSt/1123_136079.HtM
5G.okacbd009.asia/PoSt/1123_616650.HtM
5G.okacbd010.asia/PoSt/1123_663785.HtM
5G.okacbd001.asia/PoSt/1123_388313.HtM
5G.okacbd002.asia/PoSt/1123_905791.HtM
5G.okacbd003.asia/PoSt/1123_536005.HtM
5G.okacbd004.asia/PoSt/1123_487053.HtM
5G.okacbd005.asia/PoSt/1123_818034.HtM
5G.okacbd006.asia/PoSt/1123_048305.HtM
5G.okacbd007.asia/PoSt/1123_798869.HtM
5G.okacbd008.asia/PoSt/1123_409259.HtM
5G.okacbd009.asia/PoSt/1123_170690.HtM
5G.okacbd010.asia/PoSt/1123_644323.HtM
5G.okacbd001.asia/PoSt/1123_331509.HtM
5G.okacbd002.asia/PoSt/1123_066717.HtM
5G.okacbd003.asia/PoSt/1123_488168.HtM
5G.okacbd004.asia/PoSt/1123_563337.HtM
5G.okacbd005.asia/PoSt/1123_132601.HtM
5G.okacbd006.asia/PoSt/1123_417110.HtM
5G.okacbd007.asia/PoSt/1123_185317.HtM
5G.okacbd008.asia/PoSt/1123_659313.HtM
5G.okacbd009.asia/PoSt/1123_046900.HtM
5G.okacbd010.asia/PoSt/1123_055999.HtM
5G.okacbd001.asia/PoSt/1123_454288.HtM
5G.okacbd002.asia/PoSt/1123_138956.HtM
5G.okacbd003.asia/PoSt/1123_665929.HtM
5G.okacbd004.asia/PoSt/1123_390362.HtM
5G.okacbd005.asia/PoSt/1123_934041.HtM
5G.okacbd006.asia/PoSt/1123_368995.HtM
5G.okacbd007.asia/PoSt/1123_654313.HtM
5G.okacbd008.asia/PoSt/1123_289539.HtM
5G.okacbd009.asia/PoSt/1123_628110.HtM
5G.okacbd010.asia/PoSt/1123_296733.HtM
5G.okacbd001.asia/PoSt/1123_973770.HtM
5G.okacbd002.asia/PoSt/1123_268676.HtM
5G.okacbd003.asia/PoSt/1123_810237.HtM
5G.okacbd004.asia/PoSt/1123_755930.HtM
5G.okacbd005.asia/PoSt/1123_591973.HtM
5G.okacbd006.asia/PoSt/1123_264688.HtM
5G.okacbd007.asia/PoSt/1123_131208.HtM
5G.okacbd008.asia/PoSt/1123_015802.HtM
5G.okacbd009.asia/PoSt/1123_495775.HtM
5G.okacbd010.asia/PoSt/1123_168548.HtM
5G.okacbd001.asia/PoSt/1123_783236.HtM
5G.okacbd002.asia/PoSt/1123_336290.HtM
5G.okacbd003.asia/PoSt/1123_861193.HtM
5G.okacbd004.asia/PoSt/1123_138945.HtM
5G.okacbd005.asia/PoSt/1123_193786.HtM
5G.okacbd006.asia/PoSt/1123_737250.HtM
5G.okacbd007.asia/PoSt/1123_755017.HtM
5G.okacbd008.asia/PoSt/1123_125326.HtM
5G.okacbd009.asia/PoSt/1123_336906.HtM
5G.okacbd010.asia/PoSt/1123_507653.HtM
5G.okacbd001.asia/PoSt/1123_488912.HtM
5G.okacbd002.asia/PoSt/1123_924648.HtM
5G.okacbd003.asia/PoSt/1123_231616.HtM
5G.okacbd004.asia/PoSt/1123_433132.HtM
5G.okacbd005.asia/PoSt/1123_960095.HtM
5G.okacbd006.asia/PoSt/1123_565638.HtM
5G.okacbd007.asia/PoSt/1123_753220.HtM
5G.okacbd008.asia/PoSt/1123_074826.HtM
5G.okacbd009.asia/PoSt/1123_932794.HtM
5G.okacbd010.asia/PoSt/1123_082082.HtM
5G.okacbd001.asia/PoSt/1123_214017.HtM
5G.okacbd002.asia/PoSt/1123_805485.HtM
5G.okacbd003.asia/PoSt/1123_230895.HtM
5G.okacbd004.asia/PoSt/1123_581775.HtM
5G.okacbd005.asia/PoSt/1123_340000.HtM
5G.okacbd006.asia/PoSt/1123_510979.HtM
5G.okacbd007.asia/PoSt/1123_211618.HtM
5G.okacbd008.asia/PoSt/1123_163516.HtM
5G.okacbd009.asia/PoSt/1123_316799.HtM
5G.okacbd010.asia/PoSt/1123_640406.HtM