kafka学习 -- 客户端Consumer API学习

写在前面

这里我会总结一下,Springboot 集成 spring-kafka中,consumer 的相关配置,Api

这里的东西,比 Producer 稍微多一些

集成相关配置

server:
  port: 9000
spring:
  kafka:
    bootstrap-servers: 192.168.1.74:9092
    consumer:
      group-id: group_id
      # 手动提交
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 60000
    listener:
      type: batch
      log-container-config: false
      concurrency: 3
      # 手动提交
      ack-mode: manual_immediate

一、@KafkaListener

Spring Kafka 集成Kafka,消息的监听者配置,使用特别简单,只有一个注解即可实现监听

虽然只有一个注解配置,需要注意消息的序列化,以及相关监听处理

二、代码示例

示例一

    @KafkaListener(topics = "users", groupId = "group_id")
    public void consume(String message, Acknowledgment acknowledgment) throws IOException {
   
        try {
   
            logger.info(String.format("#### -> Consumed message -> %s", message));
        } catch (Exception e) {
   
            logger.error(e.getMessage(), e);
        } finally {
   
            // 手动提交 offset
            acknowledgment.acknowledge();
        }
    }

示例二

import com.common.Bar2;
import com.common.Foo2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/** * 这里定义了消费者组,和多个 topic 的对应 * * @author Gary Russell * @since 5.1 */
@Component
@KafkaListener(id = "multiGroup", topics = {
   "foos", "bars", "test"})
public class MultiMethods {
   

    private final Logger logger = LoggerFactory.getLogger(MultiMethods.class);

    @KafkaHandler
    public void foo(Foo2 foo) {
   
        System.out.println("Received: " + foo);
    }

    @KafkaHandler
    public void bar(Bar2 bar) {
   
        System.out.println("Received: " + bar);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
   
        System.out.println("Received unknown: " + object);
    }


    @KafkaHandler
    public void tests(ConsumerRecord record) {
   
        logger.info("Received -> key :{} ", record.key());
    }

}

三、更多Kafka,,客户端、服务端,监控可参考下文

全部评论

相关推荐

点赞 评论 收藏
分享
自从我室友在计算机导论课上听说了“刷 LeetCode 是进入大厂的敲门砖”,整个人就跟走火入魔了一样。他在宿舍门口贴了一张A4纸,上面写着:“正在 DP,请勿打扰,否则 Time Limit Exceeded。”日记本的扉页被他用黑色水笔加粗描了三遍:“Talk is cheap. Show me the code。”连宿舍聚餐,他都要给我们讲解:“今天的座位安排可以用回溯算法解决,但为了避免栈溢出,我建议用动态规划。来,这是状态转移方程:dp[i][j] 代表第 i 个人坐在第 j 个位置的最优解。”我让他去楼下取个快递,他不直接去,非要在门口踱步,嘴里念念有词:“这是一个图的遍历问题。从宿舍楼(root)到驿站(target node),我应该用 BFS 还是 DFS?嗯,求最短路径,还是广度优先好。”和同学约好出去开黑,他会提前发消息:“集合点 (x, y),我们俩的路径有 k 个交点,为了最小化时间复杂度,应该在 (x/2, y/2) 处汇合。”有一次另一个室友低血糖犯了,让他帮忙找颗糖,他居然冷静地分析道:“别急,这是一个查找问题。零食箱是无序数组,暴力查找是 O(n)。如果按甜度排序,我就可以用二分查找,时间复杂度降到 O(log n)。”他做卫生也要讲究算法效率:“拖地是典型的岛屿问题,要先把连通的污渍区块都清理掉。倒垃圾可以用双指针法,一个指针从左往右,一个从右往左,能最快匹配垃圾分类。”现在我们宿舍的画风已经完全变了,大家不聊游戏和妹子,对话都是这样的:“你 Two Sum 刷了几遍了?”“别提了,昨天遇到一道 Hard 题,我连暴力解都想不出来,最后只能看题解。你呢?”“我动态规划还不行,总是找不到最优子结构。今天那道接雨水给我整麻了。”……LeetCode 真的害了我室友!!!
老六f:编程嘉豪来了
AI时代还有必要刷lee...
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务