Kafka消费过程关键源码解析
本文已收录在Github,关注我,紧跟本系列专栏文章,咱们下篇再续!
- 🚀 魔都架构师 | 全网30W技术追随者
- 🔧 大厂分布式系统/数据中台实战专家
- 🏆 主导交易系统百万级流量调优 & 车联网平台架构
- 🧠 AIGC应用开发先行者 | 区块链落地实践者
- 🌍 以技术驱动创新,我们的征途是改变世界!
- 👉 实战干货:编程严选网
1 案例引入
官方Consumer最简代码用例:
@Test
public void testConsumer1() {
Properties props = new Properties();
// 指定 Kafka 服务器的地址和端口
props.setProperty("bootstrap.servers", "localhost:9092");
// 消费者组的 ID,标识一组消费者实例
props.setProperty("group.id", "test");
// 启用自动提交偏移量(true),表示消费者会自动将已处理的消息偏移量提交到 Kafka
props.setProperty("enable.auto.commit", "true");
// 自动提交偏移量的时间间隔(1000ms)
props.setProperty("auto.commit.interval.ms", "1000");
// 指定键和值的反序列化器
// 这里使用 StringDeserializer,表示消息的键和值都是String
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建 Consumer 实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 Topic
consumer.subscribe(Arrays.asList("foo", "bar"));
// 拉消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
}
简短代码背后牵涉很多问题,Consumer咋绑定特定分区?咋实现订阅topic?咋实现拉消息?
2 订阅流程
// topics :一个包含主题名称的集合,表示要订阅的主题列表
// listener : 用于处理消费者组的分区再平衡事件
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 清除未分配主题的相关缓存数据,确保订阅新主题时不会受到旧数据的影响
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.info("Subscribed to topic(s): {}", Utils.join(topics, separator: ", "));
// 重置订阅状态 将新的主题集合和监听器更新到消费者的订阅状态中。
// 如果订阅成功,返回 true。
if (this.subscriptions.subscribe(new HashSet<>(topics), listener)) {
// 请求更新元数据,以便消费者能够获取新主题的分区信息
metadata.requestUpdateForNewTopics();
}
}
订阅主流程主要更新如下关键属性:
- 订阅状态(SubscriptionState) - subscriptions 主要维护所订阅的topic和patition的消费位置等状态信息
- 元数据中的topic信息
metadata中维护了Kafka集群元数据的一个子集,包括集群的Broker节点、Topic和Partition在节点上分布,及聚焦的第二个问题:Coordinator给Consumer分配的Partition信息。
Metadata.requestUpdateForNewTopics()
请求更新元数据。
public synchronized int requestUpdateForNewTopics() {
// 覆盖上次刷新的时间戳以允许立即更新
this.lastRefreshMs = 0;
this.needPartialUpdate = true;
this.equivalentResponseCount = 0;
this.requestVersion++;
return this.updateVersion;
}
这里,并未真正发送更新元数据的请求,只是将需要更新元数据的标志位needUpdate
置true
。Kafka必须确保在第一次拉消息前元数据可用,即必须更新一次元数据,否则Consumer不知道应该去哪个Broker拉哪个Partition的消息。
3 拉消息流程
那元数据何时才真正更新呢?
拉消息时序图
KafkaConsumer#poll()方法中主要调用如下方法:
updateAssignmentMetadataIfNeeded()
更新元数据:
@Override
public boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return updateAssignmentMetadataIfNeeded(timer, true);
}
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
// 内部调用coordinator.poll()
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}
return updateFetchPositions(timer);
}
poll()里又调
ConsumerNetworkClient#ensureFreshMetadata()
boolean ensureFreshMetadata(Timer timer) {
if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(timer.currentTimeMs()) == 0) {
return awaitMetadataUpdate(timer);
} else {
// the metadata is already fresh
return true;
}
}
ConsumerNetworkClient#awaitMetadataUpdate
public boolean awaitMetadataUpdate(Timer timer) {
int version = this.metadata.requestUpdate(false);
do {
// see!!!
poll(timer);
} while (this.metadata.updateVersion() == version && timer.notExpired());
return this.metadata.updateVersion() > version;
}
内部调用client.poll()方法,实现与Cluster通信,在Coordinator注册Consumer并拉取和更新元数据。
这些都是 client 类中方法,ConsumerNetworkClient封装了Consumer和Cluster之间所有网络通信的实现,完全异步实现类。没有维护任何线程。
待发送Request都存在unsent域:
public class ConsumerNetworkClient implements Closeable {
private final UnsentRequests unsent = new UnsentRequests();
Response存放在pendingCompletion域
// 请求完成后,它们将在调用之前转移到此队列
// 是为避免在持有该对象的监视器锁时调用它们,可能导致死锁
private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();
每次调用poll()时,在当前线程中发送所有待发送Request,处理所有收到Response。
异步设计
优势:无需维护用于异步发送的和处理响应的线程,并充分发挥批量处理优势。很少的线程实现高吞吐量。
劣势:极大增加了代码的复杂度。
poll
private ConsumerRecords<K, V> poll(final Timer timer) {
// 确保至少执行一次拉取逻辑,直到计时器过期。
do {
// 尝试更新分区的分配元数据(如处理 rebalance),但无需为等待加入消费者组而阻塞
updateAssignmentMetadataIfNeeded(timer, false);
// 向 broker 发送 Fetch 请求并等待响应,以获取数据
final Fetch<K, V> fetch = pollForFetches(timer);
// !fetch.isEmpty() 不仅表示拉取到了记录,也可能虽无记录,但消费位点前进了
if (!fetch.isEmpty()) {
// 在返回已拉取记录之前,可以先发送下一轮的 Fetch 请求。
// 即可实现流水线处理(pipelining),当用户在处理当前批次数据时,
// 网络层已经在获取下一批数据了,避免不必要等待。
//
// 因为消费位点(consumed position)此时已更新,
// 所以在返回记录给用户之前,须避免触发任何可能导致数据丢失的错误(如 wakeup)
if (sendFetches() > 0 || client.hasPendingRequests()) {
// 将待发送的请求(如此处新的 Fetch 请求)立即通过网络发送出去
client.transmitSends();
}
// 将拉取到的记录(fetch.records())和新的消费位点(fetch.nextOffsets())封装成 ConsumerRecords 对象
// 经过拦截器链(onConsume)处理后,最终返给用户
return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records(), fetch.nextOffsets()));
}
} while (timer.notExpired());
}
pollForFetches()
/**
* 这是消费者拉取数据的核心内部方法。它负责与 Kafka Broker 通信以获取消息。
*
* @param timer 一个计时器,用于跟踪整个 poll() 操作的剩余时间,确保不会超出用户设置的总超时。
* @return 一个 Fetch 对象,它包含了从 Kafka 拉取到的消息记录。
*/
private Fetch<K, V> pollForFetches(Timer timer) {
// 1. 本次网络请求的最大等待时间 (pollTimeout),需权衡两个因素:
// a) 用户调用 poll() 时设置的总超时时间 (timer.remainingMs())
// b) 消费者心跳需要 (coordinator.timeToNextPoll(...))。消费者须在指定时间内向协调器发送心跳,
// 否则会被认为 "死亡" 并被踢出消费组,引发 rebalance
// 因此,真正的等待时间是这两者中的较小值,以确保既不超时,又能及时发送心跳。
// 如果 coordinator 为 null (即消费者没有加入任何消费组),则只需考虑用户设置的超时。
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// 若缓存里有未读取消息,直接返回它们
final Fetch<K, V> fetch = fetcher.collectFetch();
// 构造拉消息的请求,并发送
sendFetches();
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
Timer pollTimer = time.timer(pollTimeout);
// 发送网络请求拉消息,等待直到有消息返回或者超时
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());
// 返回拉到的消息
return fetcher.collectFetch();
}
主要由fetcher.sendFetches()实现:
- 根据元数据的信息,构造所需Broker的拉消息的Request对象
- 然后调用
ConsumerNetworkClient#send
异步发送Request - 并且注册一个回调类处理返回的Response
所有返回的Response被暂时存放在
Fetcher#completedFetches
。注意,此时的Request并未被真正发给各Broker,而被暂存在client.unsend
等待发送。 - 然后,在调用
ConsumerNetworkClient#poll
时,会真正将之前构造的所有Request发送出去,并处理收到的Response - 最后,fetcher.fetchedRecords(),将返回的Response反序列化后转换为消息列表,返给调用者
4 总结
拉消息流程涉及关键类: