在KafkaServer 中,会将 SocketServer 的请求通道传给 Kafka 请求处理线程 KafkaRequestHandler 和 KafkaApis
-
请求通道就是处理器与请求处理线程和 KafkaApis 交换数据的地方
-
如果处理器往请求通道添加请求,请求处理器线程和 KafkaApis 都可以获取到请求通道中的请求
-
如果请求处理线程和 KafkaApis 往请求通达添加响应,处理器也可以从请求通道获取响应
-
处理器会将客户端发送的请求放到全局的请求队列(requestQueue)中,提供给请求处理线程获取,请求处理线程会将请求转发给 KafkaApis 处理,最后 KafkaApis 会将处理完的响应结果放到响应队列(responseQueue)中,供处理器获取后发送给客户端
// 请求通道会保存全局的请求队列和每个处理器对应的响应队列
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
def addResponseListener(onResponse: Int => Unit) {
responseListeners ::= onResponse
}
// queueSize 默认 500,全局的请求队列
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
// 默认情况下有 3个 processor 线程,每个处理器都有一个响应队列
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
/** Get the next request or block until specified time has elapsed */
// 获取下一个请求或块,直到经过指定的时间
// 处理器从请求队列中取出请求,队列为空会阻塞,直到有处理器加入新的请求
def receiveRequest(timeout: Long): RequestChannel.Request = {
// 从队列里面获取 request 对象
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
}
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
// 发送一个待处理的请求,可能会阻塞,直到队列中有空间容纳该请求
// 如果请求队列满了,这个方法会阻塞在这里,直到有处理器取走一个请求
def sendRequest(request: RequestChannel.Request) {
requestQueue.put(request)
}
/** Get a response for the given processor if there is one */
// 获取给定处理器的响应(如果有的话)
def receiveResponse(processor: Int): RequestChannel.Response = {
// 获取对应线程的对应队列里面的响应对象
val response = responseQueues(processor).poll()
if (response != null)
response.request.responseDequeueTimeMs = SystemTime.milliseconds
response
}
/** Send a response back to the socket server to be sent over the network */
// 将响应发送回要通过网络发送的套接字服务器
// 发送响应给 SocketServer,并最终通过网络返回给客户端
def sendResponse(response: RequestChannel.Response) {
// 响应存入一个队列里面,先从数组里面先取出对应的 Processor 队列,然后把响应放到这个队列里面
responseQueues(response.processor).put(response)
for(onResponse <- responseListeners)
onResponse(response.processor)
}
}
-
请求通道保存了请求和响应两种类型的队列,它的各个方法中关于请求和响应的接收和发送是有顺序的
-
发送请求-接收请求-发送响应-接收响应
-
sendRequest():处理器接收到客户端请求后,将请求放入请求队列
-
receiveRequest():请求处理线程从队列中获取请求,并交给 KafkaApis 处理
-
sendResponse():KafkaApis 处理完,将响应结果放入响应队列
-
receiveResponse():处理器从响应队列中获取响应结果发送给客户端
-
由于一个 SocketServer 有多个处理器,每个处理器都负责一部分客户端的请求,如果请求 A 发送给处理器 A,那么对应的响应也只能发送给处理器 A
-
虽然请求队列是所有处理器全局共享的,但是最后 KafkaApis 会将请求的响应都放入处理器对应的响应队列中
-
处理器的 processCompletedReceives() 会往请求通道的请求队列添加请求,processNewResponses() 从请求通道的响应队列中获取响应