Spring Boot轻松玩转Kafka实战

Spring Boot 集成 Kafka 详解

Kafka 简介

Apache Kafka 是一个分布式流处理平台,具有高吞吐量、低延迟和可扩展性。常用于实时数据管道、消息队列和事件驱动架构。Spring Boot 通过 Spring Kafka 项目提供了与 Kafka 的无缝集成。

环境准备

确保已安装 JDK 8+、Maven 或 Gradle 以及 Kafka 环境。可通过以下命令启动 Kafka 本地服务:

# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动 Kafka
bin/kafka-server-start.sh config/server.properties

添加依赖

在 Spring Boot 项目中引入 Spring Kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置 Kafka

application.propertiesapplication.yml 中配置 Kafka 连接信息:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

生产者实现

创建 Kafka 生产者发送消息:

@RestController
public class KafkaProducerController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaTemplate.send("test-topic", message);
        return "Message sent: " + message;
    }
}

消费者实现

通过 @KafkaListener 注解实现消息消费:

@Service
public class KafkaConsumerService {
    @KafkaListener(topics = "test-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received Message: " + message);
    }
}

高级配置

自定义生产者/消费者工厂:

@Configuration
public class KafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
}

事务支持

启用 Kafka 事务需在配置中添加:

spring.kafka.producer.transaction-id-prefix=kafka-tx-

代码中使用事务:

@Transactional
public void sendWithTransaction(String message) {
    kafkaTemplate.send("test-topic", message);
    // 其他数据库操作
}

错误处理

自定义错误处理器:

@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
    return (message, exception, consumer) -> {
        System.err.println("Error occurred: " + exception.getMessage());
        return null;
    };
}

@KafkaListener 中指定错误处理器:

@KafkaListener(topics = "test-topic", errorHandler = "listenErrorHandler")
public void listenWithHandler(String message) {
    // 消费逻辑
}

性能优化

调整生产者批处理参数提升吞吐量:

spring.kafka.producer.batch-size=16384
spring.kafka.producer.linger-ms=50
spring.kafka.producer.buffer-memory=33554432

监控与管理

集成 Micrometer 监控 Kafka 指标:

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

配置暴露指标端点:

management.endpoints.web.exposure.include=health,info,metrics

安全配置

启用 SASL/SSL 认证:

spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pass";

测试验证

编写集成测试验证功能:

@SpringBootTest
@EmbeddedKafka(topics = {"test-topic"})
class KafkaIntegrationTest {
    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    void testSendAndReceive() throws Exception {
        template.send("test-topic", "test-message");
        // 验证消费逻辑
    }
}

5G.okacbd001.asia/PoSt/1123_141528.HtM
5G.okacbd002.asia/PoSt/1123_456916.HtM
5G.okacbd003.asia/PoSt/1123_218398.HtM
5G.okacbd004.asia/PoSt/1123_889831.HtM
5G.okacbd005.asia/PoSt/1123_123491.HtM
5G.okacbd006.asia/PoSt/1123_714080.HtM
5G.okacbd007.asia/PoSt/1123_537060.HtM
5G.okacbd008.asia/PoSt/1123_136079.HtM
5G.okacbd009.asia/PoSt/1123_616650.HtM
5G.okacbd010.asia/PoSt/1123_663785.HtM
5G.okacbd001.asia/PoSt/1123_388313.HtM
5G.okacbd002.asia/PoSt/1123_905791.HtM
5G.okacbd003.asia/PoSt/1123_536005.HtM
5G.okacbd004.asia/PoSt/1123_487053.HtM
5G.okacbd005.asia/PoSt/1123_818034.HtM
5G.okacbd006.asia/PoSt/1123_048305.HtM
5G.okacbd007.asia/PoSt/1123_798869.HtM
5G.okacbd008.asia/PoSt/1123_409259.HtM
5G.okacbd009.asia/PoSt/1123_170690.HtM
5G.okacbd010.asia/PoSt/1123_644323.HtM
5G.okacbd001.asia/PoSt/1123_331509.HtM
5G.okacbd002.asia/PoSt/1123_066717.HtM
5G.okacbd003.asia/PoSt/1123_488168.HtM
5G.okacbd004.asia/PoSt/1123_563337.HtM
5G.okacbd005.asia/PoSt/1123_132601.HtM
5G.okacbd006.asia/PoSt/1123_417110.HtM
5G.okacbd007.asia/PoSt/1123_185317.HtM
5G.okacbd008.asia/PoSt/1123_659313.HtM
5G.okacbd009.asia/PoSt/1123_046900.HtM
5G.okacbd010.asia/PoSt/1123_055999.HtM
5G.okacbd001.asia/PoSt/1123_454288.HtM
5G.okacbd002.asia/PoSt/1123_138956.HtM
5G.okacbd003.asia/PoSt/1123_665929.HtM
5G.okacbd004.asia/PoSt/1123_390362.HtM
5G.okacbd005.asia/PoSt/1123_934041.HtM
5G.okacbd006.asia/PoSt/1123_368995.HtM
5G.okacbd007.asia/PoSt/1123_654313.HtM
5G.okacbd008.asia/PoSt/1123_289539.HtM
5G.okacbd009.asia/PoSt/1123_628110.HtM
5G.okacbd010.asia/PoSt/1123_296733.HtM
5G.okacbd001.asia/PoSt/1123_973770.HtM
5G.okacbd002.asia/PoSt/1123_268676.HtM
5G.okacbd003.asia/PoSt/1123_810237.HtM
5G.okacbd004.asia/PoSt/1123_755930.HtM
5G.okacbd005.asia/PoSt/1123_591973.HtM
5G.okacbd006.asia/PoSt/1123_264688.HtM
5G.okacbd007.asia/PoSt/1123_131208.HtM
5G.okacbd008.asia/PoSt/1123_015802.HtM
5G.okacbd009.asia/PoSt/1123_495775.HtM
5G.okacbd010.asia/PoSt/1123_168548.HtM
5G.okacbd001.asia/PoSt/1123_783236.HtM
5G.okacbd002.asia/PoSt/1123_336290.HtM
5G.okacbd003.asia/PoSt/1123_861193.HtM
5G.okacbd004.asia/PoSt/1123_138945.HtM
5G.okacbd005.asia/PoSt/1123_193786.HtM
5G.okacbd006.asia/PoSt/1123_737250.HtM
5G.okacbd007.asia/PoSt/1123_755017.HtM
5G.okacbd008.asia/PoSt/1123_125326.HtM
5G.okacbd009.asia/PoSt/1123_336906.HtM
5G.okacbd010.asia/PoSt/1123_507653.HtM
5G.okacbd001.asia/PoSt/1123_488912.HtM
5G.okacbd002.asia/PoSt/1123_924648.HtM
5G.okacbd003.asia/PoSt/1123_231616.HtM
5G.okacbd004.asia/PoSt/1123_433132.HtM
5G.okacbd005.asia/PoSt/1123_960095.HtM
5G.okacbd006.asia/PoSt/1123_565638.HtM
5G.okacbd007.asia/PoSt/1123_753220.HtM
5G.okacbd008.asia/PoSt/1123_074826.HtM
5G.okacbd009.asia/PoSt/1123_932794.HtM
5G.okacbd010.asia/PoSt/1123_082082.HtM
5G.okacbd001.asia/PoSt/1123_214017.HtM
5G.okacbd002.asia/PoSt/1123_805485.HtM
5G.okacbd003.asia/PoSt/1123_230895.HtM
5G.okacbd004.asia/PoSt/1123_581775.HtM
5G.okacbd005.asia/PoSt/1123_340000.HtM
5G.okacbd006.asia/PoSt/1123_510979.HtM
5G.okacbd007.asia/PoSt/1123_211618.HtM
5G.okacbd008.asia/PoSt/1123_163516.HtM
5G.okacbd009.asia/PoSt/1123_316799.HtM
5G.okacbd010.asia/PoSt/1123_640406.HtM

#牛客AI配图神器#

全部评论

相关推荐

不愿透露姓名的神秘牛友
11-20 10:05
点赞 评论 收藏
分享
11-21 15:13
已编辑
郑州大学 后端工程师
Java面试先知:我觉得还是去快手吧,第一份工作至少有大厂背书,快手两年后再跳回科大估计能比现在去科大翻一倍,况且科大据说入职即巅峰
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务