<span>SpringBoot2.x 整合Kafka</span>

环境准备

producer端maven依赖

    <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
    </dependency>

application.properties配置

## Spring整合kafka
spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
# kafka producer 发送消息失败时的重试次数
spring.kafka.producer.retries=3
# 批量发送数据的配置
spring.kafka.producer.batch-size=16384
# 设置kafka 生产者内存缓冲区的大小(32M)
spring.kafka.producer.buffer-memory=33554432
# kafka消息的序列化配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka 投递配置项
spring.kafka.producer.acks=1

生产端Service编写

kafkaProducerService.java

@Slf4j
@Component
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendMessage(String topic,Object object){
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("发送消息失败:"+throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("发送消息成功:"+result.toString());
            }
        });
    }
}

consumer端application.xml配置

# Spring整合kafka
spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
# kafka consumer 消息的签收机制:手工签收
spring.kafka.consumer.enable-auto-commit=false
# 手工签收
spring.kafka.listener.ack-mode=manual
# latest[默认]:在偏移量无效的情况下,消费者从最新的记录开始读取数据
# earliest: 在偏移量无效的情况下,消费者从起始位置读取分区的进度
spring.kafka.consumer.auto-offset-reset=earliest

# 序列化配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization

#并行度
spring.kafka.listener.concurrency=5

消费端Service

KafkaConsumerService.java

@KafkaListener(groupId = "group02",topics = "topic02")
    public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
        log.info("消费端接收消息:{}",record.value());
        record.value();
        //手工签收机制
        acknowledgment.acknowledge();
    }
全部评论

相关推荐

头像 会员标识 头像
昨天 17:17
已编辑
西安交通大学 数学类
BG&nbsp;双9,因为之前一直考虑想出国想读博士,因此研二一直在被导催着写论文,一直写到五月初,大部分工作都做的差不多了,五一回去跟女朋友一直在玩儿,出于很多原因想直接就业(想赚钱摆烂),临时开始找实习,因为只做过华为的横向(理论),加上完全从五月初才开始刷leetcode,本来本科只浅浅学了一下c++,后来发现python做起来简单多了,开始准备python和leetcode去试一下算法岗(研究方向是优化理论相关的)。5.10&nbsp;过了简历5.15&nbsp;当晚有临时有汇报,让调了一下机试时间5.22&nbsp;机试三道题,一道最大公共子串动态规划,一道图模型用的dijstra解决,第三道不太会就没写了,机试300分5.30&nbsp;下午五点半技术面试,面试官对我导在的实验室有了解,对我刚弄完的论文很有兴趣,但其实非常理论,做的快速算法,主要是理论工作,直接找了之前组会ppt详细讲了一下,面试官感觉很满意,场面非常愉快。最后撕了个算法,leetcode&nbsp;714,也是动态规划,写了状态方程说了一下返回值,面试官直接就让过了。5.30&nbsp;下午六点半主管面,问了一下项目相关,之后主要是问有没有经历什么挫折,如何看待团队合作,有没有从0到1学习一些新知识体系,如何看待华为文化之类的。5.31&nbsp;晚上八点主管面过,问了一下HR,说是端午前后开奖,看看华子怎么说了,能有实习刷简历最好,不行直接回组里先做横向去了。其实从我五月初开始第一次用牛客和leetcode,感觉看牛客真是给自己添加了不少焦虑,焦虑拉满,毕竟非科班加起步晚,都感觉自己不用考虑实习了,后来跟女朋友说到这个事,得到了我的小天使的超级开解,让我觉得人生其实容错率很高,希望焦虑的同学们放宽心一些,觉得懒了就向上看,觉得累了就向下看,大部分时间平视就ok了。6.1&nbsp;有这么多人看吗,感觉大家貌似对找实习流程都不是很感兴趣,关注点不太对哦。看大家对我对象都很感兴趣,也不是不可以再分享一些被爱的小经历。从大二还比较青涩的时光到去年我的外院小女友去兼职雅思教师,我也是从强撑大男子主义到拥抱了吃软饭的新天地。其实本来只是想发个动态截个图打趣一下我对象,也不乏一些小小炫耀的心思。真心希望各位牛友在使自己优秀的同时(看牛客是真的焦虑)也能遇到那个真心爱你的女孩。有熟人看到吱一声,马上删
投递华为等公司10个岗位
点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务