1.KafKa核心组件

KafKa的核心功能模块:

1、 延迟操作组件;
2、 控制器;
3、 协调器;
4、 网络通信;
5、 日志管理器;
6、 副本管理器;
7、 动态配置管理器;
8、 心跳检测;

1.延迟操作组件

1.DelayedOperation

KafKa将一些不立即执行而要等待满足一定条件之后才触发完成的操作称为延迟操作,并将这类操作定义为一个抽象类DelayedOperation,具有延迟操作的类继承DelayedOperation

2.DelayedOperationPurgatory

https://www.jianshu.com/p/bbb1c4f45b4e

3.DelayedProduce

DelayedProduce的作用就是协助副本管理器在acks为-1的场景的时候,延迟回调responseCallback向生产者做出响应。具体表现在当消息追加到分区Leader副本之后,该分区各个Follower完成了与Leader副本消息同步之后再回调responseCallback给生产者;

4.DelayedFetch

DelayedProduce是在ProduceRequest处理中对生产者发送消息的延迟操作,自然DelayedFetch就是在FetchRequest处理的时候进行的延迟操作。在KafKa中只有消费者或者Follower副本会发起FetchRequest请求。

5.DelayedJoin

DelayedJoin是协助组协调器在消费组准备平衡操作的时候进行相应的处理。当消费组的状态转换为PreparingRebalance时候,即准备进行平衡操作,在组协调器prepareRebalance()方法中会创建一个DelayedJoin对象,并交由DelayedOperationPurgatory负责监视管理;

在消费组进行平衡操作时之所以需要使用DelayedJoin处理,是为了让组协调器等待当前消费组下所有的消费者都请求加入到消费组,即发起了JoinGroupRequest请求。每次组协调器处理完JoinGroupRequest的时候都会检测DelayedJoin是否满足了完成执行的条件;

6.DelayedHearbeat

DelayedHearbeat用于协调消费者与组协调器心跳检测相关的延迟操作,DelayedHearbeat相关功能的实现是调用GroupCoordinator的相应方法

7.DelayedCreateTopics

在创建主题的时候,需要为主题的每个分区分配到Leader之后,才调用回调函数将创建主题结果返回给客户端。

DelayedCreateTopics延迟操作是等待该主题的所有分区副本分配到Leader或者等待超时之后调用回调函数返回给客户端;

2.控制器

http://blog.csdn.net/zhanglh046/article/details/72821995

在启动KafKa集群的时候,每一个代理都会实例化并且启动一个KafKaController,并将该代理的brokerId注册到ZooKeeper的相应节点中。KafKa集群中各代理会根据选举机制选出其中一个代理作为Leader,即Leader控制器。当控制器发生宕机之后其他代理再次竞选出新的控制器。

控制器负责主题的创建与删除、分区和副本的管理以及代理故障转移处理等。

当一个代理被选举成为控制器的时候,该代理对应的KafkaController就会注册(Register)控制器相应额操作权限,同时标记自己是Leader。当代理不再成为控制器的时候,就要注销掉DeRegister相应的权限;

实现这些功能的程序入口是KafKa核心core模块下的kafka.controller.KafkaController类。

1.控制器初始化

每个代理在启动的时候都会实例化并启动一个KafkaController,KafKaController实例化的时候主要完成如下的工作;

2.控制器选举过程

每个代理启动的时候就会创建一个KafKaController实例,当KafKaController启动之后就会从所有的代理中选择一个代理作为控制器,控制器是所有代理的Leader**,因此这里也称之为Leader****选举。**

除了在启动的时候会导致选举之外,当控制器所在的代理发生故障或者ZooKeeper通过心跳机制感知控制器与自己的连接Session过期的时候,也会再次从所有代理中选出一个节点作为集群的控制器;

KafKa控制器的选举依赖于ZooKeeper。在集群的整个运行过程中,代理在ZooKeeper****不同节点上注册相应的监听器。各监听器各司其职,当所有监听的节点状态发生变化的时候就会触发相关的函数进行处理。

KafKa控制器选举的核心思想就是各代理通过争抢/controller节点请求写入到自身的信息,先成功写入的代理即为Leader**。**

3.故障转移

触发控制器进行选举有三种情况:

1、 在控制器启动的时候;
2、 当控制器发生故障转移的时候;
3、 当心跳检测超时的时候;

可以说控制器故障转移的本质就是控制权的转移,而控制权的转移也就是重新选出新的控制器。

4.代理上线和下线

1、 代理上线;

当有新的代理上线的时候,在代理启动的时候会向ZooKeeper的/brokers/ids节点下注册该代理的brokerId,此时会被副本状态机在ZooKeeper所注册的BrokerChangerListener监听器监听到该节点信息的变化

1、 代理下线;

当代理下线的时候,该代理在ZooKeeper的/brokers/ids节点注册的与该代理对应的节点将被删除,此时BrokerChangerListener的handleChildCHANGE方法将会被触发;

5.主题管理

控制器负责对主题、分区副本的管理操作。

分区和副本是主题的固有属性,因此在讲解控制器对主题管理的时候将同时讲解控制器对分区副本创建以及删除的管理操作。

控制器对分区、副本的管理在逻辑上体现在分区状态机以及副本状态机对ZooKeeper的/brokers/topics节点以及子节点注册的一系列监听器上;

1、 创建主题:;

当创建一个主题的时候,无论是通过KafKa API还是通过命令行创建主题,同步返回创建主题成功的时候,其实仅仅都是在ZooKeeper的/brokers/topics****节点成功创建了该主题对应的子节点。而服务端创建主题的操作是异步交由控制器去完成的。

1、 删除主题:

客户端执行删除主题的操作的时候仅仅是在ZooKeeper的/admin/delete_topics路径下创建一个与待删除主题同名的节点,返回该主题被标记为删除,保证本步操作成功执行的前提是配置项delete.topic.enable的值被设置为true

例如,删除主题“topic-foo”的节点,则客户端执行删除操作的时候会在/admin/delete_topics路径下创建一个名为“topic-foo” 的节点。而实际删除主题的逻辑是异步交由KafKa****控制器负责执行的;

6.分区管理

KafKa控制器(Leader控制器)对分区的管理包括对分区的创建以及删除的管理,分区Leader****选举的管理,分区自动平衡、分区副本重新分配的管理等等。

  • 分区平衡

在onControllerFailover操作的时候会启动一个分区自动平衡的定时任务,该定时任务会定期的检查集群上面各代理分布是否失去了平衡,该过程是通过调用控制器的checkAndTriggerPartitionRebalance方法完成的;

分区自动平衡需要保证分区的副本分配在不提供的代理节点上

  • 分区充分配

3.协调器

KafKa提供了消费者协调器(ConsumerCoordinator)、组协调器(GroupCoordinator)以及任务管理协调器(WorkCoordinator)3协调器(coordinator)。其中任务管理协调器被KafKa Connect用于对works的管理。

鉴于旧版高级消费者存在的问题,新版消费者进行了重新设计,为了解决消费者依赖ZooKeeper所带来的问题,KafKa在服务端引入了组协调器(GroupCoordinator),每个KafKaServer启动的时候都会创建一个GroupCoordinator实例,用于管理部分消费组和该消费组下每个消费者的消费偏移量

同时在客户端引入了消费者协调器(ConsumerCoordinator**),每个KafKaConsumer实例化的时候会实例化一个ConsumerCoordinator对象,消费者协调器负责同一个消费组下各消费者与服务端组协调器之间的通信;**

1.消费者协调器

消费者协调器(ConsumerCoordinator)是KafkaConsumer的一个成员变量,该KafKaConsumer通过消费者协调器与服务端的组协调器进行通信。由于消费者协调器是KafKaConsumer私有的,因此消费者协调器中存储的信息也只有与之对应的消费者可见,不同消费者是看不到彼此的消费者协调器中的信息的。

消费者协调器负责处理更新消费者缓存的Metadata请求,负责向组协调器发起加入消费组的请求,负责对本消费者加入消费组前、后相应的处理,负责请求离开消费组(如当消费者取消订阅的时候),还负责向组协调器发送提交消费偏移量的请求。并通过一个心跳检测定时任务来检测组协调器的运行状况,或者是让组协调器感知自己的运行状况。

同时Leader消费者的消费者协调器还负责执行分区的分配,当消费者协调器向组协调器请求加入消费组之后,组协调器会为同一个组下的消费者选出一个Leader;

通过这种方式,将分区分配的职责交由客户端自己处理,从而减轻了服务端的负担;

2.组协调器

组协调器(GroupCoordinator)负责对其管理的组员提交的相关请求进行处理,这里的组员即消费者。它负责管理与消费者之间建立连接,并从与之连接的消费者中选出一个消费者作为Leader消费者,Leader消费者负责消费者分区的分配,在SyncGroupRequest请求时发送给组协调器,组协调器会在请求处理后返回响应时下发给其管理的所有消费者。同时组协调器还管理与之连接的消费者的消费偏移量的提交,将每个消费者消费偏移量保存到KafKa的内部主题当中,并通过心跳检测来检测消费者与自己的连接状态;

1、 组协调器依赖的组件;

每一个KafkaServer启动的时候都会实例化并启动一个组协调器,每个组协调器负责一部分消费组的管理。

1、 消费者入组过程;
2、 消费偏移量管理;

新版的KafKaConsumer将消费偏移量保存到KafKa的一个内部主题中,当消费者正常运行或者进行平衡操作的时候都需要向组协调器提交当前的消费偏移量。

4.网络通信

在KafKaServer启动的时候,初始化并启动了一个SocketServer服务,用于接收客户端的连接、处理客户端请求、发送响应等。

同时创建了一个KafkaRequestHandlerPool用于管理KafkaRequestHandler。

SocketServer是基于java NIO实现的网络通信组件;

其线程模型为:

一个Acceptor线程负责接收客户端所有的连接;

N个Processor线程,每个Processor有多个Selector,负责从每个连接中读取请求;

M个Handler线程处理请求,并将产生的结果返回给Processor线程。而Handler是由KafkaRequestHanderPool管理,在Processor和Handler之间通过RequestChannel来缓冲请求,每个Handler从RequestChannel.requestQueue接受RequestChannel.Request,并把Request交由KafkaApis的Handler()方法处理,经过处理之后把对应的Response存进RequestChannel.responseQueues队列。

1.Acceptor

Acceptor是一个线程类,主要职责是监听并接受客户端的请求,建立和客户端的数据传输通道ServerSocketChannel,然后为客户端制定一个Processor;

2.Processor

也是一个线程类,主要用于从客户端读取请求数据和将相应的响应结果返回给客户端。

3.RequestChannel

RequestChannel是为了给Processor线程与Handler线程之间通信提供数据缓冲,是通信过程中Request和Response缓存的通道,是ProCSSor线程与Handler线程交换数据的地方;

4.SocketServer启动过程

在启动一个KafKa代理的时候会实例化并启动一个SocketServer服务;

SocketServer启动之后就可以通过Acceptor接收客户端的请求;

5.日志管理器

1. KafKa日志结构

KafKa消息是以主题为基本单位进行组织的,各个主题之间相互独立。

每个主题在逻辑结构上又可以由一个或者多个分区构成,分区数可以在创建主题的时候指定,也可以在主题创建之后再进行修改。

可以通过Kafka自带的用于主题管理操作的脚本kafka-topics.sh来修改某个主题的分区数,但是只能增加一个主题的分区数目,而不能减少其分区数目。

每个分区可以有一个或者多个副本,从副本中选出一个副本作为Leader,Leader负责与客户端进行读写操作,其他副本作为Follower。

生产者将消息发送到Leader副本的代理节点,而Follower副本从Leader副本同步数据;

从存储结构上,分区的每个副本在逻辑上对应一个Log对象,每个Log对象又划分为多个LogSegment,每个LogSegment包括一个日志文件和两个索引文件,其中两个索引文件分别为偏移量索引文件和时间戳索引文件。Log负责对 LogSegment的管理,在Log对象中维护了一个ConcurrentSkipListMap,保存该主题所有分区对应的所有LogSegment;

KafKa将日志文件封装为一个FileMessageSet对象,两个索引文件封装为OffsetIndex和TimeIndex对象;

数据文件用于存储消息,每条消息有一个固定长度的消息头和一个可变长度(N字节)的净荷(payload)组成,

2.日志管理器启动过程

3.日志加载以及恢复

4.日志清理

KafKa将一个主题的每个分区副本分成多个日志段文件,这样通过定时日志清理操作,将旧的日志文件及时的清理并释放出空间,以避免磁盘上的日志段文件过大而导致新的日志无法写入。同时分成多个日志段文件而不是一个文件也便于清理操作。

我们可以通过日志段的更新时间或者是日志段的大小控制进行日志的清理;

KafKa提供了日志删除(delete)和日志压缩(compact)两种清理日志的策略,通过参数cleanup.policy来指定日志清理的策略。

【日志删除】

在日志管理器启动的时候会启动一个后台定时任务用于定时删除日志段文件

【日志压缩】

另外一种日志清理的策略是日志压缩,这种策略是一种更细粒度的清理策略,它基于消息的Key,通过压缩每个Key对应的消息只保留最后一个版本的数据,该Key对应的其他版本在压缩的时候会被清除,类似数据库的更新操作;

6.副本管理器

6.副本管理器

引入副本机制使得KafKa能够在整个集群中只要保证至少有一个代理存活就不会影响整个集群的工作,从而大大提高了KafKa集群的可靠性和稳定性。

KafKa对代理的存活必须满足两个条件:

(1)一个存活的节点必须与ZooKeeper保持连接,维护与ZooKeeper的Session(通过ZooKeeper的心跳机制来实现)

(2)如果一个节点作为Follower副本,该节点必须即时的与分区的Leader副本保持消息绒布,不能落后太久;

1.分区

KafKa将一个主题在逻辑上分为一个或者多个分区,每个分区在物理存储上对应一个目录,

分区目录下存储的是该分区的日志段,包括日志数据文件和两个索引文件。每个分区又对应一个或者多个副本。

需要注意的是:分区的个数可以大于节点数,但是副本数不能大于节点数,因为副本需要分布在不同的节点上,这样才能达到备份的目的;

每个主题的某一个分区只能被同一个消费组下的其中一个消费者消费,因此我们说分区是消费并行度的基本单位。同时,对于上层应用而言,也是最小的存储单元。

尽管每个分区是有一系列有序的顺序端组成,从消费者角度来讲,我们订阅消费一个主题,也就是订阅了该主题的所有分区,当然也可以指定订阅主题的某个分区。

从生产者的角度来讲,我们可以通过指定消息的Key以及分区分配策略将消息发送到主题相应的分区当中;

KafKa将分区抽象为一个Partition对象,Partition定义了一个assignedReplicaMap引用用于保存该分区的所有副本,assignedReplicaMap是一个Pool类型的对象,并维护了该分区铜鼓的副本集合inSyncReplicas,同时Partition对象定义了分区对副本操作的方法,包括创建副本、副本角色切换、ISR列表维护以及调用日志管理器(LogManager)追加消息等。

2.副本

一个分区可以有一个或者多个副本,根据是否接受读写请求,又分为Leader副本和Follower副本,一个分区有1个Leader副本,有0个或者多个Follower副本;

Leader副本处理分区的所有的读写请求并维护自身以及Follower副本的状态信息,如LEO、HW等。Follower副本作为消费者从Leader副本拉取消息进行同步,当Leader失效的时候,通过分区Leader选举器从副本列表中选出一个副本作为新的Leader;

KafKa将副本抽象为一个Replica对象;

3.副本管理器启动过程

每个代理启动的时候都会启动一个副本管理器;

4.副本过期检查

副本管理器启动的时候启动了一个对副本过期检查的定时任务,该定时任务调用副本管理器的maybeShrinkIsr方法定期进行副本过期检查,其功能就是检查分区ISR是否需要进行收缩,即从ISR踢出与Leader数据不同步的副本;

5.追加消息

当生产者发送消息(ProduceRequest)或者消费者提交偏移量到内部主题的时候,由副本管理器的appendMessage将消息追加到相应分区的Leader副本中。

6.拉取消息

副本管理器除了负责将消息写入到Leader副本之外,同时还负责处理KafkaApis的FetchRequest请求,从分区Leader副本获取消息;

从KafKa中拉取消息的角色有两个,一个是KafKa的普通消费者,另一个就是Follower副本;

副本管理器通过FetchRequest请求的replicaId来区分拉取请求的角色;

因为每个副本有replicaId属性,即副本的replicaId总是非负数,而消费者的replicaId的值为-1;

7.副本同步过程

8.副本角色转换

当分区ISR发生变化的时候,控制器会向分区各副本对应的代理发出LeaderAndISRRequest请求,各代理的副本管理器接收到请求之后调用becomeLeaderOrFollower()方法进行处理;

9.关闭副本

关闭副本操作通常有两种方式:

第一种:将副本下线;

第二种:将副本下线并删除;

7.Handler

Handler其实是KafkaRequestHandler的简称,KafkaRequestHandler是一个线程类,负责从RequestChannel中读取请求然后交由KafkaApis处理;

8.动态配置管理器

动态配置管理器(DynamicConfiManager)主要用来对相关配置的变化进行处理,KafKa将可以通过ZooKeeper进行管理的配置划分为4个类型,称为配置类型(ConfigType)或者配置级别,每个配置类称为一个实体,这4个类型分别为Topic(主题级别)、Client(客户端级别)、User(用户级别)、和Broker(代理级别);

  • Topic(主题级别):监听器会调用主题级别配置处理器TopicConfigHandler进行处理
  • Client(客户端级别):ClientIdConfigHandler
  • User(用户级别):UserConfigHandler进行处理
  • 和Broker(代理级别):通知处理器会调用代理级别的配置处理器BrokerConfigHandler对配置进行处理

9.代理健康检测

KafKa集群依赖于ZooKeeper进行管理,每个代理启动的时候都向ZooKeeper进行一系列元数据的注册,即在ZooKeeper相应目录下创建一个临时节点,当代理与ZooKeeper连接断开之后,相应的临时节点也会被删除;

KafKa健康检测机制实现类是KafkaHealthcheck,该类实例化的时候创建一个SessionExpireListener监听器,该监听器实现了IZKStateListener接口,

10. KafKa内部监控

KafKa使用Yammer Metrics进行内部状态的监控,用来收集报告KafkaServer端和客户端的metrics信息。Yammer Metrics是Yammer提供的一个Java库,用于检测JVM上相关服务运行的状态。