Kafka消费过程关键源码解析

本文已收录在Github关注我,紧跟本系列专栏文章,咱们下篇再续!

  • 🚀 魔都架构师 | 全网30W技术追随者
  • 🔧 大厂分布式系统/数据中台实战专家
  • 🏆 主导交易系统百万级流量调优 & 车联网平台架构
  • 🧠 AIGC应用开发先行者 | 区块链落地实践者
  • 🌍 以技术驱动创新,我们的征途是改变世界!
  • 👉 实战干货:编程严选网

1 案例引入

官方Consumer最简代码用例:

@Test
public void testConsumer1() {
    Properties props = new Properties();
    // 指定 Kafka 服务器的地址和端口
    props.setProperty("bootstrap.servers", "localhost:9092");
  	// 消费者组的 ID,标识一组消费者实例
    props.setProperty("group.id", "test");
  	// 启用自动提交偏移量(true),表示消费者会自动将已处理的消息偏移量提交到 Kafka
    props.setProperty("enable.auto.commit", "true");
  	// 自动提交偏移量的时间间隔(1000ms)
    props.setProperty("auto.commit.interval.ms", "1000");
  	// 指定键和值的反序列化器
  	// 这里使用 StringDeserializer,表示消息的键和值都是String
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // 创建 Consumer 实例
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    // 订阅 Topic
    consumer.subscribe(Arrays.asList("foo", "bar"));

    // 拉消息
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
        }
    }
}

简短代码背后牵涉很多问题,Consumer咋绑定特定分区?咋实现订阅topic?咋实现拉消息?

2 订阅流程

// topics :一个包含主题名称的集合,表示要订阅的主题列表
// listener : 用于处理消费者组的分区再平衡事件
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
  // 清除未分配主题的相关缓存数据,确保订阅新主题时不会受到旧数据的影响
  fetcher.clearBufferedDataForUnassignedTopics(topics);
  log.info("Subscribed to topic(s): {}", Utils.join(topics, separator: ", "));
  // 重置订阅状态 将新的主题集合和监听器更新到消费者的订阅状态中。
  // 如果订阅成功,返回 true。
  if (this.subscriptions.subscribe(new HashSet<>(topics), listener)) {
      // 请求更新元数据,以便消费者能够获取新主题的分区信息
      metadata.requestUpdateForNewTopics();
  }
}

订阅主流程主要更新如下关键属性:

  1. 订阅状态(SubscriptionState) - subscriptions 主要维护所订阅的topic和patition的消费位置等状态信息
  2. 元数据中的topic信息

metadata中维护了Kafka集群元数据的一个子集,包括集群的Broker节点、Topic和Partition在节点上分布,及聚焦的第二个问题:Coordinator给Consumer分配的Partition信息。

Metadata.requestUpdateForNewTopics()

请求更新元数据。

public synchronized int requestUpdateForNewTopics() {
    // 覆盖上次刷新的时间戳以允许立即更新
    this.lastRefreshMs = 0;
    this.needPartialUpdate = true;
    this.equivalentResponseCount = 0;
    this.requestVersion++;
    return this.updateVersion;
}

这里,并未真正发送更新元数据的请求,只是将需要更新元数据的标志位needUpdatetrue。Kafka必须确保在第一次拉消息前元数据可用,即必须更新一次元数据,否则Consumer不知道应该去哪个Broker拉哪个Partition的消息。

3 拉消息流程

那元数据何时才真正更新呢?

拉消息时序图

KafkaConsumer#poll()方法中主要调用如下方法:

updateAssignmentMetadataIfNeeded()

更新元数据:

@Override
public boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
    return updateAssignmentMetadataIfNeeded(timer, true);
}

boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
  	// 内部调用coordinator.poll()
    if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
        return false;
    }

    return updateFetchPositions(timer);
}

poll()里又调

ConsumerNetworkClient#ensureFreshMetadata()

boolean ensureFreshMetadata(Timer timer) {
    if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(timer.currentTimeMs()) == 0) {
        return awaitMetadataUpdate(timer);
    } else {
        // the metadata is already fresh
        return true;
    }
}

ConsumerNetworkClient#awaitMetadataUpdate

public boolean awaitMetadataUpdate(Timer timer) {
    int version = this.metadata.requestUpdate(false);
    do {
      	// see!!!
        poll(timer);
    } while (this.metadata.updateVersion() == version && timer.notExpired());
    return this.metadata.updateVersion() > version;
}

内部调用client.poll()方法,实现与Cluster通信,在Coordinator注册Consumer并拉取和更新元数据。

这些都是 client 类中方法,ConsumerNetworkClient封装了Consumer和Cluster之间所有网络通信的实现,完全异步实现类。没有维护任何线程。

待发送Request都存在unsent域:

public class ConsumerNetworkClient implements Closeable {

    private final UnsentRequests unsent = new UnsentRequests();

Response存放在pendingCompletion域

// 请求完成后,它们将在调用之前转移到此队列
// 是为避免在持有该对象的监视器锁时调用它们,可能导致死锁
private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();

每次调用poll()时,在当前线程中发送所有待发送Request,处理所有收到Response。

异步设计

优势:无需维护用于异步发送的和处理响应的线程,并充分发挥批量处理优势。很少的线程实现高吞吐量。

劣势:极大增加了代码的复杂度。

poll

private ConsumerRecords<K, V> poll(final Timer timer) {
    // 确保至少执行一次拉取逻辑,直到计时器过期。
    do {
        // 尝试更新分区的分配元数据(如处理 rebalance),但无需为等待加入消费者组而阻塞
        updateAssignmentMetadataIfNeeded(timer, false);

				 // 向 broker 发送 Fetch 请求并等待响应,以获取数据
        final Fetch<K, V> fetch = pollForFetches(timer);

        // !fetch.isEmpty() 不仅表示拉取到了记录,也可能虽无记录,但消费位点前进了
        if (!fetch.isEmpty()) {
            // 在返回已拉取记录之前,可以先发送下一轮的 Fetch 请求。
            // 即可实现流水线处理(pipelining),当用户在处理当前批次数据时,
            // 网络层已经在获取下一批数据了,避免不必要等待。
            //
            // 因为消费位点(consumed position)此时已更新,
            // 所以在返回记录给用户之前,须避免触发任何可能导致数据丢失的错误(如 wakeup)
            if (sendFetches() > 0 || client.hasPendingRequests()) {
                // 将待发送的请求(如此处新的 Fetch 请求)立即通过网络发送出去
                client.transmitSends();
            }
            // 将拉取到的记录(fetch.records())和新的消费位点(fetch.nextOffsets())封装成 ConsumerRecords 对象
            // 经过拦截器链(onConsume)处理后,最终返给用户
            return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records(), fetch.nextOffsets()));
        }
    } while (timer.notExpired());
}

pollForFetches()

/**
 * 这是消费者拉取数据的核心内部方法。它负责与 Kafka Broker 通信以获取消息。
 *
 * @param timer 一个计时器,用于跟踪整个 poll() 操作的剩余时间,确保不会超出用户设置的总超时。
 * @return 一个 Fetch 对象,它包含了从 Kafka 拉取到的消息记录。
 */
private Fetch<K, V> pollForFetches(Timer timer) {
    // 1. 本次网络请求的最大等待时间 (pollTimeout),需权衡两个因素:
    //    a) 用户调用 poll() 时设置的总超时时间 (timer.remainingMs())
    //    b) 消费者心跳需要 (coordinator.timeToNextPoll(...))。消费者须在指定时间内向协调器发送心跳,
    //       否则会被认为 "死亡" 并被踢出消费组,引发 rebalance
    //    因此,真正的等待时间是这两者中的较小值,以确保既不超时,又能及时发送心跳。
    //    如果 coordinator 为 null (即消费者没有加入任何消费组),则只需考虑用户设置的超时。
    long pollTimeout = coordinator == null ? timer.remainingMs() :
            Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // 若缓存里有未读取消息,直接返回它们
    final Fetch<K, V> fetch = fetcher.collectFetch();

    // 构造拉消息的请求,并发送
    sendFetches();

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure

    // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
    // updateAssignmentMetadataIfNeeded before this method.
    if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
        pollTimeout = retryBackoffMs;
    }

    Timer pollTimer = time.timer(pollTimeout);
  	// 发送网络请求拉消息,等待直到有消息返回或者超时
    client.poll(pollTimer, () -> {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasAvailableFetches();
    });
    timer.update(pollTimer.currentTimeMs());
    // 返回拉到的消息
    return fetcher.collectFetch();
}

主要由fetcher.sendFetches()实现:

  1. 根据元数据的信息,构造所需Broker的拉消息的Request对象
  2. 然后调用ConsumerNetworkClient#send异步发送Request
  3. 并且注册一个回调类处理返回的Response 所有返回的Response被暂时存放在Fetcher#completedFetches。注意,此时的Request并未被真正发给各Broker,而被暂存在client.unsend等待发送。
  4. 然后,在调用ConsumerNetworkClient#poll时,会真正将之前构造的所有Request发送出去,并处理收到的Response
  5. 最后,fetcher.fetchedRecords(),将返回的Response反序列化后转换为消息列表,返给调用者

4 总结

拉消息流程涉及关键类:

全部评论

相关推荐

不愿透露姓名的神秘牛友
07-09 13:05
TMD找工作本来就烦,这东西什么素质啊😡
Beeee0927:hr是超雄了,不过也是有道理的
点赞 评论 收藏
分享
评论
点赞
9
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务