智乐活

Kafka 内部工作原理

2018/07/23 Share

了解 Kafka 的内部工作原理有助于理解 Kafka 的行为和诊断问题,本篇文章主要从集群成员关系、控制器、复制、处理请求和存储等方面介绍 Kafka 的内部机制。

集群成员关系

Kafka 使用 ZooKeeper 来维护集群内成员的信息,每个 broker 在启动时都通过在 /brokers/ids 路径创建临时节点将自己的唯一标示 broker.id 注册到 ZooKeeper。

Kafka 会订阅 ZooKeeper 的 /brokers/ids 路径,在有 broker 加入或退出集群时收到通知。

关闭 broker 时对应的临时节点会消失,不过它的 id 会继续存储在其他数据中,如主题的副本列表。完全关闭一个 broker 后,如果使用相同 id 启动一个全新 broker,它会立即加入集群,并拥有和旧 broker 相同的分区和主题。

控制器

控制器其实是一个 broker,除一般 broker 的功能外,还负责选举分区首领。

集群内的 broker 通过创建临时节点 /controller 使自己成为控制器,如果收到【节点已存在】异常,即控制器已存在,会创建 ZooKeeper 的 watch 对象,在节点变更时收到通知、再次尝试成为控制器,以确保集群内始终存在一个控制器。

新选出来的控制器会用一个递增的值更新 /controller_epoch 节点,在其他 broker 收到 controller epoch 的值后,如果收到包含旧的 epoch 的来自控制器的消息就会选择忽略,以此来避免脑裂(多个控制器)。

当控制器通过 ZooKeeper 路径发现集群内的 broker 离开时,会遍历那些失去首领的分区,在分区副本列表中选择 broker 成为新的首领,然后向所有包含新首领和现有跟随者的 broker 发送请求,传递新首领及跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,跟随者开始从新首领那里复制消息。

复制

复制是 Kafka 的核心功能之一,可以在个别节点失效时仍然保证 Kafka 的可用性和持久性。

Kafka 使用主题来组织数据,每个主题被分成若干个分区,而每个分区可以有多个副本。

副本分为首领副本和跟随者副本两种。每个分区有一个首领副本,所有生产者、消费者的请求都会经过首领副本。首领以外的副本都是跟随者副本,这些副本不处理来自生产者和消费者的请求,唯一的任务时从首领复制消息,在首领崩溃时成为新首领。

跟随者副本像消费者一样往首领副本发送请求,请求中包含了跟随者想要获取消息的偏移量,这些偏移量总是有序的。首领通过查看每个跟随者的最新偏移量知道每个跟随者复制的进度。如果跟随者在 10 s 内没有请求最新的数据,就会被认为是不同步的。只有持续请求最新消息的副本才是同步的副本,首领失效时只有同步副本才可能成为新首领。

除了当前首领外,每个分区都有一个首选首领,即创建主题时选定的首领。创建主题时会在 broker 之间均衡首领,所以当首选首领成为当前首领时,broker 间的负载会得到均衡。默认情况下,auto.leader.rebalance.enable 的值为 true,即如果检查到当前首领不是首选首领且首选首领是同步的,就会出发首领选举,让首选首领成为当前首领。

处理请求

broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求,Kafka 提供一个基于 TCP 的二进制协议来指定请求和相应的格式,并按请求到达的顺序来处理。

broker 在监听的端口上运行 Acceptor 线程,线程创建连接后交给 Processor 线程去处理。Processor 线程将请求放入请求队列,IO 线程处理后将响应放入响应队列,再由 Processor 线程返回响应。

生产者和消费者的请求必须发送给分区的首领副本,对特定分区的获取请求也不能发送给不含该分区首领的 broker 上。客户端通过请求元数据来确定请求该发往何处,请求包含订阅的主题列表,响应包含主题的分区、副本信息。客户端会缓存元数据信息,通过 metadata.max.age.ms 可以配置刷新时间。如果 broker 收到不该发向自己的请求,会返回相应的错误,客户端收到错误后会刷新元数据并重发请求。

生产请求

生产者使用 acks 配置项来定义需要多少个 broker 确认才能认为消息写入成功。acks 为 0 时只需发送消息,不会等待 broker 确认;acks 为 1 时只要首领确认就算写入成功;acks 为 all 时需要所有 broker 确认才算成功。

broker 收到生产请求后,会判断用户是否有写入权限、检查 acks 的值是否有效,如果 acks 为 all 时还会检查是否有足够的 broker 保证消息能被安全写入。

在通过一些列的检查后,broker 会将消息写入到文件系统缓存中,不保证何时被刷新到磁盘上。Kafka 不会等待数据写入磁盘,而是依赖复制保证持久性。消息写入分区首领后,如果 acks 大于 1,则放入一个缓冲区,等待其他 broker 响应,否则直接返回给客户端。

获取请求

客户端向 broker 发送获取请求时,会将主题、偏移量等信息传给 broker,而 broker 则先检查请求是否有效,如偏移量是否存在,检查通过后使用零复制技术(直接从文件里发送到网络,不经过缓冲区,性能更好)返回给客户端。

如果客户端配置了请求数据下限,broker 在消息不够时会先等待,直到消息足够或到达等待时间上限时再返回。

需要注意的是,大部分客户端只能读取到已经被写入到全部同步副本的消息,没有被写入到全部同步副本的消息被认为是不安全的,如果首领崩溃,可能会破坏一致性。这也意味着如果 broker 间复制消息的速度变慢,消息到达消费者的时间也会变长,可以通过 replica.lag.time.max.ms 来配置复制消息时被允许的最大延迟时间。

物理存储

Kafka 的基本存储单元是分区,分区无法在多个 broker 间再细分,也无法在多个磁盘上再细分,所以分区大小受到单个挂载点可用空间的限制。Kafka 通过 log.dirs 来配置存储分区的目录。

分配分区

创建主题时,Kafka 会首先决定如何在 broker 间分配分区。在分配分区时,Kafka 会尽可能地将各分区副本在 broker 间均匀分配,首领分区也会尽可能地均匀分配,如果 broker 指定了机架信息,会尽可能地将各分区副本均匀地分配到不同机架。

分配完分区之后,Kafka 会根据各目录中的分区数量来决定分区使用哪个目录,新的分区总被添加到分区数最小的目录中。

文件管理

在一个大文件中查找和删除消息效率不高,所以 Kafka 把每个分区分成若干个片段。默认情况下每个片段包含 1 GB 或一周的数据,以较小的为准。写入数据时,如果达到上限,就关闭当前文件,打开一个新片段。

正在写入的数据片段称为活跃片段,活跃片段永远不会被删除。已经关闭的不活跃片段,会根据配置的最长保留时间或最大保存消息量来删除旧的消息片段。

文件格式

Kafka 的消息和偏移量都保存在文件中,磁盘内的消息、生产者发送过来的消息及发送给消费的消息是一样的,所以可以使用零复制技术发送给消费者。

另外消息中还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳,如果生产者发送的是压缩过的消息,同一个批次的消息会被压缩在一起,消费者在解压这个消息后,会看到整个批次的消息。

索引

消费者可以从任何偏移量位置读取消息,为了能更快地定位到指定偏移量,Kafka 为每个分区维护了一个索引,将偏移量映射到片段文件及在文件中的位置。

索引也被分成片段,删除消息时也会删除相应的索引。Kafka 不维护索引的校验和,如果索引出现损坏,Kafka 会重新读取消息来生成索引。有必要的话,删除索引的绝对安全的。

参考

Kafka 官方文档

Kafka 权威指南

CATALOG
  1. 1. 集群成员关系
  2. 2. 控制器
  3. 3. 复制
  4. 4. 处理请求
    1. 4.1. 生产请求
    2. 4.2. 获取请求
  5. 5. 物理存储
    1. 5.1. 分配分区
    2. 5.2. 文件管理
    3. 5.3. 文件格式
    4. 5.4. 索引
  6. 6. 参考