了解 Kafka 的内部工作原理有助于理解 Kafka 的行为和诊断问题,本篇文章主要从集群成员关系、控制器、复制、处理请求和存储等方面介绍 Kafka 的内部机制。
集群成员关系
Kafka 使用 ZooKeeper 来维护集群内成员的信息,每个 broker 在启动时都通过在 /brokers/ids 路径创建临时节点将自己的唯一标示 broker.id 注册到 ZooKeeper。
Kafka 会订阅 ZooKeeper 的 /brokers/ids 路径,在有 broker 加入或退出集群时收到通知。
关闭 broker 时对应的临时节点会消失,不过它的 id 会继续存储在其他数据中,如主题的副本列表。完全关闭一个 broker 后,如果使用相同 id 启动一个全新 broker,它会立即加入集群,并拥有和旧 broker 相同的分区和主题。
控制器
控制器其实是一个 broker,除一般 broker 的功能外,还负责选举分区首领。
集群内的 broker 通过创建临时节点 /controller 使自己成为控制器,如果收到【节点已存在】异常,即控制器已存在,会创建 ZooKeeper 的 watch 对象,在节点变更时收到通知、再次尝试成为控制器,以确保集群内始终存在一个控制器。
新选出来的控制器会用一个递增的值更新 /controller_epoch 节点,在其他 broker 收到 controller epoch 的值后,如果收到包含旧的 epoch 的来自控制器的消息就会选择忽略,以此来避免脑裂(多个控制器)。
当控制器通过 ZooKeeper 路径发现集群内的 broker 离开时,会遍历那些失去首领的分区,在分区副本列表中选择 broker 成为新的首领,然后向所有包含新首领和现有跟随者的 broker 发送请求,传递新首领及跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,跟随者开始从新首领那里复制消息。
复制
复制是 Kafka 的核心功能之一,可以在个别节点失效时仍然保证 Kafka 的可用性和持久性。
Kafka 使用主题来组织数据,每个主题被分成若干个分区,而每个分区可以有多个副本。
副本分为首领副本和跟随者副本两种。每个分区有一个首领副本,所有生产者、消费者的请求都会经过首领副本。首领以外的副本都是跟随者副本,这些副本不处理来自生产者和消费者的请求,唯一的任务时从首领复制消息,在首领崩溃时成为新首领。
跟随者副本像消费者一样往首领副本发送请求,请求中包含了跟随者想要获取消息的偏移量,这些偏移量总是有序的。首领通过查看每个跟随者的最新偏移量知道每个跟随者复制的进度。如果跟随者在 10 s 内没有请求最新的数据,就会被认为是不同步的。只有持续请求最新消息的副本才是同步的副本,首领失效时只有同步副本才可能成为新首领。
除了当前首领外,每个分区都有一个首选首领,即创建主题时选定的首领。创建主题时会在 broker 之间均衡首领,所以当首选首领成为当前首领时,broker 间的负载会得到均衡。默认情况下,auto.leader.rebalance.enable 的值为 true,即如果检查到当前首领不是首选首领且首选首领是同步的,就会出发首领选举,让首选首领成为当前首领。
处理请求
broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求,Kafka 提供一个基于 TCP 的二进制协议来指定请求和相应的格式,并按请求到达的顺序来处理。
broker 在监听的端口上运行 Acceptor 线程,线程创建连接后交给 Processor 线程去处理。Processor 线程将请求放入请求队列,IO 线程处理后将响应放入响应队列,再由 Processor 线程返回响应。
生产者和消费者的请求必须发送给分区的首领副本,对特定分区的获取请求也不能发送给不含该分区首领的 broker 上。客户端通过请求元数据来确定请求该发往何处,请求包含订阅的主题列表,响应包含主题的分区、副本信息。客户端会缓存元数据信息,通过 metadata.max.age.ms 可以配置刷新时间。如果 broker 收到不该发向自己的请求,会返回相应的错误,客户端收到错误后会刷新元数据并重发请求。
生产请求
生产者使用 acks 配置项来定义需要多少个 broker 确认才能认为消息写入成功。acks 为 0 时只需发送消息,不会等待 broker 确认;acks 为 1 时只要首领确认就算写入成功;acks 为 all 时需要所有 broker 确认才算成功。
broker 收到生产请求后,会判断用户是否有写入权限、检查 acks 的值是否有效,如果 acks 为 all 时还会检查是否有足够的 broker 保证消息能被安全写入。
在通过一些列的检查后,broker 会将消息写入到文件系统缓存中,不保证何时被刷新到磁盘上。Kafka 不会等待数据写入磁盘,而是依赖复制保证持久性。消息写入分区首领后,如果 acks 大于 1,则放入一个缓冲区,等待其他 broker 响应,否则直接返回给客户端。
获取请求
客户端向 broker 发送获取请求时,会将主题、偏移量等信息传给 broker,而 broker 则先检查请求是否有效,如偏移量是否存在,检查通过后使用零复制技术(直接从文件里发送到网络,不经过缓冲区,性能更好)返回给客户端。
如果客户端配置了请求数据下限,broker 在消息不够时会先等待,直到消息足够或到达等待时间上限时再返回。
需要注意的是,大部分客户端只能读取到已经被写入到全部同步副本的消息,没有被写入到全部同步副本的消息被认为是不安全的,如果首领崩溃,可能会破坏一致性。这也意味着如果 broker 间复制消息的速度变慢,消息到达消费者的时间也会变长,可以通过 replica.lag.time.max.ms 来配置复制消息时被允许的最大延迟时间。
物理存储
Kafka 的基本存储单元是分区,分区无法在多个 broker 间再细分,也无法在多个磁盘上再细分,所以分区大小受到单个挂载点可用空间的限制。Kafka 通过 log.dirs 来配置存储分区的目录。
分配分区
创建主题时,Kafka 会首先决定如何在 broker 间分配分区。在分配分区时,Kafka 会尽可能地将各分区副本在 broker 间均匀分配,首领分区也会尽可能地均匀分配,如果 broker 指定了机架信息,会尽可能地将各分区副本均匀地分配到不同机架。
分配完分区之后,Kafka 会根据各目录中的分区数量来决定分区使用哪个目录,新的分区总被添加到分区数最小的目录中。
文件管理
在一个大文件中查找和删除消息效率不高,所以 Kafka 把每个分区分成若干个片段。默认情况下每个片段包含 1 GB 或一周的数据,以较小的为准。写入数据时,如果达到上限,就关闭当前文件,打开一个新片段。
正在写入的数据片段称为活跃片段,活跃片段永远不会被删除。已经关闭的不活跃片段,会根据配置的最长保留时间或最大保存消息量来删除旧的消息片段。
文件格式
Kafka 的消息和偏移量都保存在文件中,磁盘内的消息、生产者发送过来的消息及发送给消费的消息是一样的,所以可以使用零复制技术发送给消费者。
另外消息中还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳,如果生产者发送的是压缩过的消息,同一个批次的消息会被压缩在一起,消费者在解压这个消息后,会看到整个批次的消息。
索引
消费者可以从任何偏移量位置读取消息,为了能更快地定位到指定偏移量,Kafka 为每个分区维护了一个索引,将偏移量映射到片段文件及在文件中的位置。
索引也被分成片段,删除消息时也会删除相应的索引。Kafka 不维护索引的校验和,如果索引出现损坏,Kafka 会重新读取消息来生成索引。有必要的话,删除索引的绝对安全的。