工作机制
首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的状态一致,在有新消息到达时尝试从首领那里复制消息, 不 过有各种原因会导致同步失败。例如,网络拥塞导致复制变慢, broker 发生崩演导致复制滞后,直到重启 broker 后复制才会继续。
为了与首领保持同步, 跟随者向首领发送获取数据的请求, 这种请求与消费者为了读取消息而发送的请求是一样的。首领将响应消息发给跟随者。请 求消息里包含了跟随者想要获取消息的偏移量, 而且这些偏移量总是有序的 。
一个跟随者副本先请求消息 1,接着请求消息 2,然后请求消息 3,在收到这 3 个请求的响应之前,它是不会发送第 4 个请求消息的。如果跟随者发送了请 求消息 4,那么首领就知道它已经收到了前面 3 个请求的响应。 通过査看每个跟随者请求的最新偏移量, 首领就会知道每个跟随者复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无法与首领保持一致, 在首领发生失效时,它就不可能成为新首领,因为它没有包含全部的消息。
相反,持续请求得到的最新消息副本被称为同步副本。在首领发生失效时,只有同步副本才有可能被选为新首领。
处理请求的内部机制
broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。 Kafka 提供了一个二进制协议(基于 TCP),指定了请求消息的格式以及 broker 如何对请求作出响应——包括成功处理请求或在处理请求过程中遇到错误。
客户端发起连接并发送请求,broker 处理请求并作出响应。 broker 按照请求到达的顺序来处理它们这种顺序保证让 Kaka 具有了消息队列的特性,同时 保证保存的消息也是有序的。
所有的请求消息都包含一个标准消息头:
Request type(也就是 API key)
Request version( broker 可以处理不同版本的客户端请求,并根据客户端版本作出不同的响应)
Correlation id-一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)
Client Id 用于标识发送请求的客户端
broker 会在它所监听的每一个端口上运行一个 Acceptor 线程,这个线程会创建一个连接并把它交给 Processor 线程去处理。 Processor 线程(也被叫作 “网络线程”)的数量是可配置的。网络线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。
请求消息被放到请求队列后,IO 线程会负责处理它们。比较常见的请求类型有:
生产请求:生产者发送的请求,它包含客户端要写入 broker 的消息。
获取请求:在消费者和跟随者副本需要从 broker 读取消息时发送的请
生产请求和获取请求都必须发送给分区的首领副本。如果 broker 收到一个针对特定分区的请求,而该分区的首领在另一个 broker 上,那么发送请求 的客户端会收到一个“非分区首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的 broker 上,也会出现同样的错误。Kafka 客 户端要自己负责把生产请求和获取请求发送到正确的 broker 上。
那么客户端怎么知道该往哪里发送请求呢?客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。服务器 端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任意一个 broker ,因为所有 broker 都缓存了这些信息。
一般情况下,客户端会把这些信息缓存起来,并直接往目标 broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些 信息(刷新的时间间隔通过 metadata.max.age.ms 参数来配置,2.1.3 的客户端默认参数 30S),从而知道元数据是否发生了变更, 比如,在新 broker 加入集群时,部分副本会被移动到新的 broker 上。另外,如果客户端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客 户端正在使用过期的元数据信息,之前的请求被发到了错误的 broker 上。
生产请求
我们曾经说过, acks 这个配置参数,该参数指定了需要多少个 broker 确认才可以认为一个消息写入是成功的。不同的配置对“写入成功”的界定是 不一样的,如果 acks=1,那么只要首领收到消息就认为写入成功;如果 acks=all,那么需要所有同步副本收到消息才算写入成功; 如果 acks=0, 那么生产者在把 消息发出去之后, 完全不需要等待 broker 的响应。
包含首领副本的 broker 在收到生产请求时, 会对请求做一些验证。
•发送数据的用户是否有主题写入权限?
请求里包含的 acks 值是否有效(只允许出现 0、1 或 all) ?(ack=-1 等同于 ack=all)
如果acks=all, 是否有足够多的同步副本保证消息已经被安全写入? z 之后,消息被写入本地磁盘。在 Linux 系统上,消息会被写到文件系统缓存里,并不保证它们何时会被刷新到磁盘上。Kafka 不会一直等待数据被写到磁盘 上,它依赖复制功能来保证消息的持久性。
在消息被写入分区的首领之后, broker 开始检査 acks 配置参数一如果 acks 被设为 0 或 1, 那么 broker 立即返回响应;如果 acks 被设为 all,那么请求 会被保存在一个叫作炼狱的缓冲区里, 直到首领发现所有跟随者副本都复制了消息, 响应才会被返回给客户端。
获取请求
broker 处理获取请求的方式与处理生产请求的方式很相似。客户端发送请求,向 broker 请求主题分区里具有特定偏移量的消息, 好像在说: “请把主 题 Test 分区 0 偏移量从 53 开始的消息以及主题 Test 分区 3 偏移量从 64 开始的消息发给我。”客户端还可以指定 broker 最多可以从一个分区里返回多 少数据。 这个限制是非常重要的, 因为客户端需要为 broker 返回的数据分配足够的内存。 如果没有这个限制, broker 返回的大量数据有可能耗尽客户端 的内存。
我们之前讨论过,请求需要先到达指定的分区首领上,然后客户端通过査询元数据来确保请求的路由是正确的。首领在收到请求时,它会先检査请求是否 有效,比如,指定的偏移量在分区上是否存在?如果客户端请求的是已经被删除的数据,或者请求的偏移量不存在, 那么 broker 将返回一个错误。
如果请求的偏移量存在, broker 将按照客户端指定的数量上限从分区里读取消息, 再把消息返回给客户端。 Kafka 使用零复制技术向客户端发送消息 一一也就是说, Kafka 直接把消息从文件(或者更确切地说是 Linux 文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。 这是 Kafka 与其他大 部分数据库系统不一样的地方, 其他数据库在将数据发送给客户端之前会先把它们保存在本地缓存里。 这项技术避免了字节复制, 也不需要管理内存缓 冲区, 从而获得更好的性能。
客户端除了可以设置 broker 返回数据的上限, 也可以设置下限。 例如, 如果把下限设置为 10KB,就好像是在告诉 broker:“等到有 10KB 数据的时候再 把它们发送给我。”在主题消息流量不是很大的情况下,这样可以减少 CPU 和网络开销。 客户端发送一个请求, broker 等到有足够的数据时才把它们返回 给客户端, 然后客户端再发出情求, 而不是让客户端每隔几毫秒就发送一次请求,每次只能得到很少的数据甚至没有数据。对比这两种情况, 它们最终读取 的数据总量是一样的, 但前者的来回传送次数更少, 因此开销也更小。
当然,我们不会让客户端一直等待 broker 累积数据。在等待了一段时间之后,就可以把可用的数据拿回处理,而不是一直等待下去。所以,客户端可以定 义一个超时时间,告诉 broker: “如果你无法在 K 毫秒内累积满足要求的数据量, 那么就把当前这些数据返回给我。
ISR
并不是所有保存在分区首领上的数据都可以被客户端读取。大部分客户端只能读取已经被写入所有同步副本的消息。 分区首领知道每个消息会被复 制到哪个副本上, 在消息还没有被写入所有同步副本之前, 是不会发送给消费者的,尝试获取这些消息的请求会得到空的响应而不是错误。
因为还没有被足够多副本复制的消息被认为是“不安全”的,如果首领发生崩横,另一 个副本成为新首领,那么这些消息就丢失了。如果我们允许消 费者读取这些消息,可能就会破坏一致性。试想, 一个消费者读取并处理了这样的一个消息,而另一个消费者发现这个消息其实并不存在。所以,我们会等到 所有同步副本复制了这些消息,才允许消费者读取它们。这也意味着,如果 broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长 (因为我们会先等待消息复制完毕) 。延迟时间可以通过参数 replica. lag. time. max. ms 来配置, 它指定了副本在复制消息时可被允许的最大延迟时间。
Kafka 的数据复制是以 Partition 为单位的。而多个备份间的数据复制,通过 Follower 向 Leader 拉取数据完成。从一这点来讲,有点像 Master-Slave 方 案。不同的是,Kafka 既不是完全的同步复制,也不是完全的异步复制,而是基于 ISR 的动态复制方案。
ISR,也即 In-Sync Replica。每个 Partition 的 Leader 都会维护这样一个列表,该列表中,包含了所有与之同步的 Replica(包含 Leader 自己)。每次数 据写入时,只有 ISR 中的所有 Replica 都复制完,Leader 才会将其置为 Commit,它才能被 Consumer 所消费
这种方案,与同步复制非常接近。但不同的是,这个 ISR 是由 Leader 动态维护的。如果 Follower 不能紧“跟上”Leader,它将被 Leader 从 ISR 中移除, 待它又重新“跟上”Leader 后,会被 Leader 再次加加 ISR 中。每次改变 ISR 后,Leader 都会将最新的 ISR 持久化到 Zookeeper
至于如何判断某个 Follower 是否“跟上”Leader,不同版本的 Kafka 的策略稍微有些区别。
从0.9.0.0 版本开始,replica.lag.max.messages 被移除,故 Leader 不再考虑 Follower 落后的消息条数。另外,Leader 不仅会判断 Follower 是否在 replica.lag.time.max.ms 时间内向其发送 Fetch 请求,同时还会考虑 Follower 是否在该时间内与之保持同步。
在第一步中,Leader A 总共收到 3 条消息,但由于 ISR 中的 Follower 只同步了第 1 条消息(m1),故只有 m1 被 Commit,也即只有 m1 可被 Consumer 消费。此时 Follower B 与 Leader A 的差距是 1,而 Follower C 与 Leader A 的差距是 2,虽然有消息的差距,但是满足同步副本的要求保留在 ISR 中。同步 副本概念参见《复制》
在第二步中,由于旧的 Leader A 宕机,新的 Leader B 在 replica.lag.time.max.ms 时间内未收到来自 A 的 Fetch 请求,故将 A 从 ISR 中移除,此时 ISR={B, C}。同时,由于此时新的 Leader B 中只有 2 条消息,并未包含 m3(m3 从未被任何 Leader 所 Commit),所以 m3 无法被 Consumer 消费。
(上图中就是因为 acks 不为 all 或者-1,不全部复制,就会导致单台服务器宕机时的数据丢失 m3 丢失了)
使用 ISR 方案的原因
由于Leader 可移除不能及时与之同步的 Follower,故与同步复制相比可避免最慢的 Follower 拖慢整体速度,也即 ISR 提高了系统可用性。
ISR中的所有 Follower 都包含了所有 Commit 过的消息,而只有 Commit 过的消息才会被 Consumer 消费,故从 Consumer 的角度而言,ISR 中的所有 Replica 都始终处于同步状态,从而与异步复制方案相比提高了数据一致性。
ISR 相关配置说明
Broker 的 min.insync.replicas 参数指定了 Broker 所要求的 ISR 最小长度,默认值为 1。也即极限情况下 ISR 可以只包含 Leader。但此时如果 Leader 宕 机,则该 Partition 不可用,可用性得不到保
只有被ISR 中所有 Replica 同步的消息才被 Commit,但 Producer 发布数据时,Leader 并不需要 ISR 中的所有 Replica 同步该数据才确认收到数据。Producer 可以通过 acks 参数指定最少需要多少个 Replica 确认收到该消息才视为该消息发送成功。acks 的默认值是 1,即 Leader 收到该消息后立即告诉 Producer 收到该消息,此时如果在 ISR 中的消息复制完该消息前 Leader 宕机,那该条消息会丢失。而如果将该值设置为 0,则 Producer 发送完数据后,立即认为 该数据发送成功,不作任何等待,而实际上该数据可能发送失败,并且 Producer 的 Retry 机制将不生效。更推荐的做法是,将 acks 设置为 all 或者-1,此 时只有 ISR 中的所有 Replica 都收到该数据(也即该消息被 Commit),Leader 才会告诉 Producer 该消息发送成功,从而保证不会有未知的数据丢失。