备战大半年秋招资料分享-Kafka篇

备战秋招大半年,目前已经拿到offer上岸,将半年来的笔记分享给大家。更多 笔记涵盖  MYSQL、Elasticsearch、Kafka、设计模式、JVM、Java语言基础、集合原理、并发技术 。
需要的同学可以加我vx:uukiinternet 私发给你们哦。



Q1:Segment index文件为什么还要记录消息偏移量以及在log文件中位置的对应关系,一个Segment内不是顺序写入么?

Kafka是分布式消息系统,常用于应用解耦、异步消息等场景。基本架构如下:

Topic是面向Producer、Consumer的逻辑概念,底层使用追加文件,顺序写磁盘(效率高)的方式实现。一个Topic由多个partition组成,均匀分布在多个broker中,以实现对请求的负载均衡。对于每一个partition,在broker上会有对应的类似topic-0这样的文件目录存储消息(日志文件)。事实上,每一个partition又由更小的结构segment组成,这样做的目的一是为了更好的检索(相比于一整个大的partition文件,不同segment文件名都记录着上一个segment文件的最后偏移量信息,使得在检索时候可以使用二分查找来加快查找效率),其次是为了更细粒度的管理消息(例如将一些过期的消息清除等)。Segment结构又细分为index文件、log文件;Seg index文件记录着消息在log文件第几个位置以及对应log文件物理偏移地址,log文件真实存储消息本身以及消息偏移量。



复制原理与同步方式
Kafka以partition replicas 的方式来实现数据高可用。同一个partition的所有副本中会有一个是leader,其余是followers,在leader正常运行的情况,所有读写请求都是直接与leader通信;Kafka支持三种消息确认的方式以提供不同程度的数据安全保证:1)ack=1,当leader成功将消息写到本地日志,则返回客户端成功;2)ack=0,无需等待leader确认,此时吞吐量最大,但同时也面临数据丢失的风险。3)ack=-1,ISR同步副本中所有副本都成功同步leader的消息时,返回成功

ISR:出于高可用考虑,在所有followers中挑选若干follower与leader组成ISR同步副本队列,leader负责维护该队列,在ack=-1配置下,当ISR中所有副本都完成同步消息后才会返回客户端成功


HW(HighWaterMark): HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。

LEO(LogEndOffeset): 示每个 partition 的 log 最后一条 Message 的位置。


Leader选举机制:
1、Controller Leader选举:在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态等工作。比如当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。再比如当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。Kafka Controller的选举是依赖Zookeeper来实现的,在Kafka集群中哪个broker能够成功创建/controller这个临时(EPHEMERAL)节点他就可以成为Kafka Controller
2、Partition Leader选举:由Controller负责完成:创建分区、分区上线(原leader宕机)触发。选举原则从所有副本集(AR),顺序找到一个存活的且在ISR里的副本当做leader。极端情况下,全部副本下线,策略有两种,通过配置指定,1)等待ISR中副本恢复,可能等待时间久,但是数据安全得到保证。 2)等待任意非ISR副本恢复,面临数据丢失风险。


Consumer Rebalance:

触发时机: 1、组成员数发生变更  2、 订阅主题数发生变更。 3、 订阅主题的分区数发生变更

Rebalace 过程:消费者第一次加入消费者组时候,会向Coordinator发送一个joinGroup请求,第一个消费者会成为leader。leader从 群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。
    • Range 策略,就是把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2,消费者 2 负责分区 3-4,消费者 3 负责分区 5。
    • RoundRoin 策略,就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。
  1. 群主分配完成之后,把分配情况发送给群组协调器。
  2. 群组协调器再把这些信息发送给消费者。每一个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息

如果确定Coordinator:所有broker上面都会启动Coordinator组件,最终确定哪个生效负责rebalance的方法:根据consumer group name哈希到partition分区,最终落到哪个partition分区所在的broker的coordinator生效。参考链接:https://dunwu.github.io/bigdata-tutorial/kafka/kafka-rebalance.html#%E4%B8%80%E3%80%81%E4%BB%80%E4%B9%88%E6%98%AF-rebalance

所有消息的变更都是通过Zookeeper进行发现并且通知到对应的组件处理:1)brokers leader(controller)宕机,zk感知并重新选举  2)partition副本/leader宕机,zk感知并通知controller进行partition leader选举算法。  3)consumer group成员心跳消失,通知coordinator负责consumer rebalance

这里提下 Kafka 写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁盘。
因此 Kafka 达到高吞吐、低延迟的原因主要有以下 4 点
页缓存是在内存中分配的,所以消息写入的速度很快。
Kafka 不必和底层的文件系统进行交互,所有繁琐的 I/O 操作都由操作系统来处理。
Kafka 采用追加写的方式,避免了磁盘随机写操作。
使用以 sendfile 为代表的零拷贝技术提高了读取数据的效率。
PS: 使用页缓存而非堆内存还有一个好处,就是当 Kafka broker 的进程崩溃时,堆内存的数据会丢失,但是页缓存的数据依然存在,重启 Kafka broker 后可以继续提供服务。
页缓存以及零拷贝技术


常见问题:
1、数据积压:消费能力弱,未能在max.session.ms时间内完成消费,导致rebalance,partition分配到另一个consumer。可以修改max.poll.record和max.poll.interval.ms值解决。
2、重复消费:消费未能及时提交,出现rebalance。

核心配置
设置 replication.factor >= 3
unclean.leader.election.enable:如果unclean.leader.election.enable参数的值为false,那么就意味着非ISR中的副本不能够参与选举,此时无法进行新的选举,此时整个分区处于不可用状态。如果unclean.leader.election.enable参数的值为true,那么可以从非ISR集合中选举follower副本称为新的leader。
log.retention.{hours|minutes|ms}:控制一条消息数据被保存多长时间。ms 设置最高、minutes 次之、hours 最低。
Producer
设置 acks = all
buffer.memory:缓冲区大小,如果写入速度过快,超过缓冲区大小会导致生产者send阻塞,因此1500W每天,一秒173个数据,每个数据大概200KB,即每秒34.6M消息数据,设置500M缓存区大小。
message.max.byte:broker对于每条消息大小限制
batch.size:消息所属相同分区满足一个batch大小后才会发送,每个数据200KB,一个batch20个数据,即4M,内网机器传输时间可以接受。
linger.ms:不满batch.size时,多久发送batch。默认50ms
max.request.size:将多个batch一起发送,5个batch,20M左右。
Consumer
max.poll.record:一次获取最大的记录数
max.poll.interval.ms:两次poll最大时间间隔
heartbeat线程 每隔heartbeat.interval.ms向coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms 时间内 向 coordinator发送过心跳包,那么 group coordinator就认为当前的kafka consumer是活着的。

Kafka分区数是不是越多越好?
一、客户端/服务器端需要使用的内存就越多  ,这个producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。
二、文件句柄的开销。  每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。

三. 如何正确选择kafka分区数

创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 =  Tt / max(Tp, Tc)



#互联网求职##学习路径#
全部评论

相关推荐

自学java狠狠赚一...:骗你点star的,港卵公司,记得把star收回去
点赞 评论 收藏
分享
评论
3
23
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务