早期旧版本的客户端采用 Scala 编写,功能比较简单,没有采用高性能的 Reactor 模式实现,也没有新版本中的回调功能
一、旧版消费者使用
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
Producer<Interger, String> producer = new Producer(new ProducerConfig(props));
KeyedMessage<Integer, String> data = new KeyedMessage("myTopic", "message");
producer.send(data);
producer.close();
二、同步/异步 模式
-
小对比
-
新版本将消息封装成 ProducerRecord 传给 KafkaProducer 对象,而旧版本将消息构造成 KeyedMessage 传给 Producer 对象
-
新版本 KafkaProducer 发送消息时返回的是 Future,可以借此实现 同步/异步 模式,并且可以自定义回调方法,旧版本没有提供自定义的回调方法,但是实现了 同步/异步 模式
-
配置 props.put(“producer.type”, “sync”); 默认是 sync
-
同步模式
-
生产者类型为 sync
-
同步模式下,消息直接交给事件处理器 EventHandler 处理
-
异步模式
-
生产者类型为 async
-
异步模式有一个基于阻塞队列的生产者发送线程 ProducerSendThread
-
异步模式下,消息会先缓存到阻塞队列中,再由生产者发送线程定时取出批量消息,交给事件处理器 EventHandler 去处理
class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) {
// 阻塞队列
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
// 同步/异步 模式标记
private var sync: Boolean = true
// 客户端设置生产者类型
config.producerType match {
// 同步模式
case "sync" =>
// 异步模式
case "async" =>
sync = false
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
queue,
eventHandler,
config.queueBufferingMaxMs,
config.batchNumMessages,
config.clientId)
producerSendThread.start()
}
/**
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
* 使用同步或异步生成器将按键分区的数据发送到主题
* @param messages the producer data object that encapsulates the topic, key and message data
*/
def send(messages: KeyedMessage[K,V]*) {
lock synchronized {
// 先判断是否已经 close
if (hasShutdown.get)
throw new ProducerClosedException
// 消息 topic 状态
recordStats(messages)
// 同步/异步模式
if (sync)
eventHandler.handle(messages)
else
asyncSend(messages)
}
}
}
三、事件处理器处理客户端发送的消息
-
发送流程对比
-
新版本:将消息保存到记录收集器的不同分区中,每个分区都有一个存储批记录的队列,发送线程获取记录收集器的数据时,按照节点不同的分区进行分组,针对每个节点创建一个客户端请求
-
旧版本:对消息集序列化,然后通过 dispatchSerializedData() 方法分发序列化的消息,消息有可能会失败,如果分发序列化消息的方法有返回值,说明还有未完成的发送数据,就要重试
def handle(events: Seq[KeyedMessage[K,V]]) {
// 序列化事件
val serializedData = serialize(events)
// 未完成的请求
var outstandingProduceRequests = serializedData
while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
// 分发数据,将最原始无序的消息按照 Broker - 分区 - MessageSet 分组
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
// 有返回值表示出错未完成,继续重试
if (outstandingProduceRequests.nonEmpty) {
// 重试次数 -1
remainingRetries -= 1
}
}
}
-
旧版本使用阻塞队列来存储所有消息,所有消息共用一个队列,只有在发送的时候才按照节点和分区进行分组,不管有没有满,属于同一个节点的所有分区消息都会一起被发送
-
分发序列化消息步骤
-
将消息序列按照节点和分区分组,每个节点都有一个 messagesPerBrokerMap 字典
-
将每个节点包括多分区的 messagesPerBrokerMap 原始消息,转换成 MessageSet
-
调用 send() 方法,向每个节点发送序列化后的消息集(MessageSet)
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
// 对原始消息集进行分区和整理
val partitionedDataOpt = partitionAndCollate(messages)
val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
// 按照节点和分区进行分组
for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled) {
messagesPerBrokerMap.foreach(partitionAndEvent =>
trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
}
// 将 KeyedMessage 序列转换成 ByteBufferMessageSet 对象
val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
messageSetPerBrokerOpt match {
case Some(messageSetPerBroker) =>
// 发送消息
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
// 发送失败的分区
failedTopicPartitions.foreach(topicPartition => {
// 添加到失败列表重试
messagesPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data)
// 没有失败的记录,所有消息发送成功
case None => // nothing
}
})
case None => // failed to group messages
messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
}
}
failedProduceRequests
}
-
新版本采用 NetworkClient 和 Reactor 模式将客户端请求发送到多个服务端节点,旧版本只能在客户端维护每个目标节点的网络连接
-
生产者连接池中保存了 BrokerId 到一个生产者网络连接对象 SyncProducer 的映射关系
-
send() 会根据目标节点,从连接池获取生产者网络连接对象,将生产者请求发送给目标节点
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
// 创建生产者请求
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,config.requestTimeoutMs, messagesPerTopic)
// 从连接池获取生产者
val syncProducer = producerPool.getProducer(brokerId)
// 发送生产者请求
val response = syncProducer.send(producerRequest)
val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE.code).toSeq
failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
failedTopicPartitions
}
四、对消息集按照节点和分区进行整理
-
生产者的消息集 messages 没有区分主题和分区,所以在分发数据的时候对每条消息选择所属的分区,重新按照消息代理节点组织数据
-
partitionAndCollate() 方法对原始的消息集进行分区和整理
-
获取对应主题的分区集合
-
分区器从分区集合中为消息选择一个分区编号
-
获取选择的分区以及分区里面的主副本节点
-
构造 “消息代理节点编号 -> 分区 -> 消息集” 的字典数据结构
// 对原始数据集进行分区和整理,输入消息是无序的,输出结果按照节点分组,再按照 TopicAndPartition(TAP) 分组
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
// 消息代理节点编号 -> 分区 -> 消息集
val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
// 遍历每一条消息
for (message <- messages) {
// 一个 topic 有多个 partition
val topicPartitionsList = getPartitionListForTopic(message)
// 一条消息只会写到一个 partition,根据分区器分到一个分区编号
val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
// 将失败推迟到发送操作,以便正确处理对其他代理的请求
// 一个分区有副本,但写的时候只写到主副本所在的节点
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
// dataPerBroker 是每个 Broker 的数据,ret 是最后的返回值,包含每个 Broker 的数据
var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) =>
dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
// Broker 不存在里层 Map,创建一个新的 Map,并放入 ret 里
case None =>
dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker)
}
val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
// Broker 对应的消息集,相同的 Broker 下 topic-partition 不一定相同
var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) =>
dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
// 分区消息集不存在,创建一个列表,并放入这个 Broker 数据
case None =>
dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
}
// 真正将消息添加到集合中
dataPerTopicPartition.append(message)
}
Some(ret)
}
- 在实际处理时要针对消息 KeyedMessage 的 Seq 集合封装成更高层的 MessageSet,groupMessagesToSet() 会将每个分区的消息集转换成 ByteBufferMessageSet
// 将 KeyedMessage 序列转换成 ByteBufferMessageSet 对象
private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
val messagesPerTopicPartition = messagesPerTopicAndPartition.map {
case (topicAndPartition, messages) =>
// KeyedMessage 包括了 Key、Value,其中 message 就是 value 的原始数据
val rawMessages = messages.map(_.message)
(topicAndPartition, config.compressionCodec match {
case NoCompressionCodec => new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
case _ => new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
)
}
Some(messagesPerTopicPartition)
}
- 旧版本的和新版本的分区器对有 key 的操作类似,相同 key 会被分配到同一个分区,没有 Key 的话旧版本是将消息发送到 sendPartitionPerTopicCache 缓存,缓存没有更新前相同主题会发往相同分区,只有刷新缓存的时候才可能被定位到其他分区,不是严格意义上的随机分配
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
val partition =
if(key == null) {
// If the key is null, we don't really need a partitioner
// 如果键为空,我们实际上不需要分区器
// So we look up in the send partition cache for the topic to decide the target partition
// 因此,我们在发送分区缓存中查找主题,以确定目标分区
val id = sendPartitionPerTopicCache.get(topic)
id match {
case Some(partitionId) =>
// directly return the partitionId without checking availability of the leader,
// since we want to postpone the failure until the send operation anyways
partitionId
case None =>
// 存在副本的分区,类似于 Cluster 中的 availablePartitionsByTopic
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
// 随机选择一个分区并放到缓存中,在缓存没有刷新时,相同主题只使用一个分区
val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
partitionId
}
} else
partitioner.partition(key, numPartitions)
partition
}
五、生产者使用阻塞通道发送请求
-
新版本的生产者客户端用选择器来维护多个节点的网络连接,旧版本使用连接池的方式(实际上就是一个 Map),为每个节点创建一个 SyncProducer,根据目标节点编号,获取对应的 SyncProducer,使用阻塞通道 BlockingChannel 来发送请求和读取响应结果
-
客户端发送请求时用 requiredAcks 参数表示是否需要应答
-
如果需要应答,会读取服务端返回的响应内容,设置到 ProducerResponse 对象中
-
不需要应答,send() 方法会返回 null
class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.sendBufferBytes, config.requestTimeoutMs)
/**
* Send a message. If the producerRequest had required.request.acks=0, then the
* 发送一个信息。如果 producerRequest require.request.acks = 0,然后
* returned response object is null
* 返回的响应对象为空
*/
def send(producerRequest: ProducerRequest): ProducerResponse = {
...
response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true)
if(producerRequest.requiredAcks != 0) {
val producerResponse = ProducerResponse.readFrom(response.payload)
producerRequestStats.getProducerRequestStats(config.host, config.port).throttleTimeStats.update(producerResponse.throttleTime, TimeUnit.MILLISECONDS)
producerRequestStats.getProducerRequestAllBrokersStats.throttleTimeStats.update(producerResponse.throttleTime, TimeUnit.MILLISECONDS)
producerResponse
}
else
null
}
/**
* Common functionality for the public send methods
* 公共发送方法的通用功能
*/
private def doSend(request: RequestOrResponse, readResponse: Boolean = true): NetworkReceive = {
// 准备 NetworkReceive,读取服务端响应
var response: NetworkReceive = null
// 向阻塞类型的连接通道发送请求
blockingChannel.send(request)
if(readResponse) {
// 从阻塞通道读取响应
response = blockingChannel.receive()
}
}
}
- 读写操作使用的阻塞通道都来自 SocketChannel,send 会发送一个完整的 Send 对象,receive 会读取一个完整的 NetworkReceive 对象
/**
* A simple blocking channel with timeouts correctly enabled.
* 正确启用超时的简单阻塞通道。
* 阻塞通道表示客户端和目标节点建立的网络连接通道
*/
class BlockingChannel(...) {
private var channel: SocketChannel = null
private var readChannel: ReadableByteChannel = null
private var writeChannel: GatheringByteChannel = null
def connect() = lock synchronized {
if(!connected) {
channel = SocketChannel.open()
channel.configureBlocking(true)
channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
writeChannel = channel
readChannel = Channels.newChannel(channel.socket().getInputStream)
}
}
def send(request: RequestOrResponse): Long = {
val send = new RequestOrResponseSend(connectionId, request)
send.writeCompletely(writeChannel)// 写到通道中
}
def receive(): NetworkReceive = {
val response = readCompletely(readChannel)
response.payload().rewind()// 读取到 ByteBuffer,回到缓冲区最开始,便于读取
response// 返回响应,如果客户端需要应答,直接使用 response.payload
}
private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)// 从只读通道中读取数据
response
}
}
- 旧版本和新版本的 Kafka 通道功能类似,都和原始的通道操作有关,两个版本的生产者请求对象都继承来 Send 接口,读取响应的时候都使用来相同的 NetworkReceive 类来保存服务端的响应结果