<span>SpringBoot2.x 整合Kafka</span>

环境准备

producer端maven依赖

    <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
    </dependency>

application.properties配置

## Spring整合kafka
spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
# kafka producer 发送消息失败时的重试次数
spring.kafka.producer.retries=3
# 批量发送数据的配置
spring.kafka.producer.batch-size=16384
# 设置kafka 生产者内存缓冲区的大小(32M)
spring.kafka.producer.buffer-memory=33554432
# kafka消息的序列化配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka 投递配置项
spring.kafka.producer.acks=1

生产端Service编写

kafkaProducerService.java

@Slf4j
@Component
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendMessage(String topic,Object object){
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("发送消息失败:"+throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("发送消息成功:"+result.toString());
            }
        });
    }
}

consumer端application.xml配置

# Spring整合kafka
spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
# kafka consumer 消息的签收机制:手工签收
spring.kafka.consumer.enable-auto-commit=false
# 手工签收
spring.kafka.listener.ack-mode=manual
# latest[默认]:在偏移量无效的情况下,消费者从最新的记录开始读取数据
# earliest: 在偏移量无效的情况下,消费者从起始位置读取分区的进度
spring.kafka.consumer.auto-offset-reset=earliest

# 序列化配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization

#并行度
spring.kafka.listener.concurrency=5

消费端Service

KafkaConsumerService.java

@KafkaListener(groupId = "group02",topics = "topic02")
    public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
        log.info("消费端接收消息:{}",record.value());
        record.value();
        //手工签收机制
        acknowledgment.acknowledge();
    }
全部评论

相关推荐

不愿透露姓名的神秘牛友
06-27 15:07
点赞 评论 收藏
分享
06-27 18:45
中山大学 Ruby
25届应届毕业生,来广州2个礼拜了,找不到工作,绝望了,太难过了…
应届想染班味:9爷找不到工作只能说明,太摆了或者太挑了。
点赞 评论 收藏
分享
二月份来北京实习,虽然提前做了攻略,但是是人生第一次经历租房,全程自己搞定,所以难免还是踩坑了😅奉劝大家,租房不要只看自己的房间如何,还要看别人的房门口环境如何因为我就是那个倒霉蛋,我旁边房间额门口堆了一大堆杂物,都是另一个房间的人放在外面的,而且他门口放了几十双鞋子,冬天还好,现在是夏天,可太味了,怎么有这么多鞋啊啊啊啊,请看图片O(≧口≦)O一开始这屋里是一个人住,后来变成两个人住(他说他妈妈来北京看病暂时住,ok能体谅的)但是大概一个多月以后他妈妈回家了,无缝衔接了一位女朋友接着住,而且他的女朋友巨能买东西,我真的不得不吐槽,🥲我不管别人怎么花钱的,但是你买东西你起不来,你能不能换个时间约送上门,总是在周内或者周末的某一天早上七点多,没到起来的时间,快递员框框敲门了!!!!而且经常点外卖但是我们楼下有门禁,外卖员按响铃他俩不去解锁,一直等一直等,等到我们其他人受不了去帮解锁,吵得要死,他们像聋了一样!!!服啦!!!我的房租一个月不到1600,但是是阳隔,很不隔音,隔壁的大哥有时候会半夜吃薯片或者嗑瓜子(凌晨两三点的时候)好几次我都从梦里被嗑出来了🥲还好还剩两个月就实习结束可以回家了,呜呜想家,想我自己的窝😭
码农索隆:这也是我不喜欢合租的原因
我的租房踩坑经历
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务