涉及网络通信时,一般使用选择器模型,选择器使用 Java NIO 异步非阻塞方式管理连接和读写请求,好处就是单个线程就能管理多个网络连接通道,生产者只需要使用一个选择器就能同时和 Kafka 集群的多个服务端进行网络通信
一、Java NIO 的一些概念
- SocketChannel 客户端网络连接通道,底层的字节数据读写都发生在通道上,通道会和字节缓冲区一起使用(channel.read(buffer) / channel.write(buffer))
- Selector 选择器,发送在通道上的事件有读和写,选择器通过选择键的方式监听读写事件
- SelectionKey 选择键,将通道注册到选择器上,channel.register(selector) 返回选择键,读写事件发生时,通过选择键就可以得到对应的通道进行读写操作
二、客户端连接服务端并建立 Kafka 通道
-
选择器的 connect() 方法创建客户端到指定远程服务器的网络连接,使用的是 Java NIO 的 SocketChannel 对象完成
-
这里创建了更抽象的 KafkaChannel,并使用 SelectionKeys.attach(KafkaChannel) 将选择键和 KafkaChannel 关联起来
-
轮询的时候可以使用 SelectionKeys.attachment() 获取对应的 KafkaChannel
-
选择器还会维护一个节点编号和 KafkaChannel 的 Map,方便客户端根据节点编号获取 KafkaChannel
-
关系梳理
-
SocketChannel 注册到 Selector 上返回 SelectionKeys,将 Selector 用于构建传输层,再把传输层用于构造 KafkaChannel
-
这样 KafkaChannel 和 SocketChannel 通过键进行了关联,本质上 KafkaChannel 就是对 SocketChannel 的一层封装
/**
* Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.
* 开始连接到给定的地址,并将连接添加到与给定id号关联的nioSelector中。
* <p>
* Note that this call only initiates the connection, which will be completed on a future {@linkpoll(long)}
* call. Check {@linkconnected()} to see which (if any) connections have completed after a given poll call.
* 注意,此调用只启动连接,连接将在未来的{@linkpoll(long)}调用中完成。检查{@linkconnected()},查看在给定的轮询调用之后完成了哪些连接(如果有)。
*/
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
// Java NIO 代码
// 获取 SocketChannel
SocketChannel socketChannel = SocketChannel.open();
// 设置非阻塞
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
// 设置参数,Producer 里面有些默认值
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
// 默认值是 false,代表要开启 Nagle 的算法,会把网络中小的数据包收集起来组成一个大的包再发送
socket.setTcpNoDelay(true);
boolean connected;
try {
// 尝试连接,立马成功返回 true,很久才成功会返回 false
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
// 注册 Selector
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
// 封装一个 KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
// 把 key 和 KafkaChannel 关联起来
key.attach(channel);
// 缓存起来
this.channels.put(id, channel);
// 正常情况下,网络不能完成连接
// 如果连接上了,取消前面的注册 OP_CONNECT 事件
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels 对于立即连接的通道,不会触发OP_CONNECT
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
三、Kafka 通道和网络传输层
-
构建 Kafka 通道的传输层有多种实现,比如纯文本模式、sasl、ssl 加密模式,PlaintextTransportLayer 就是纯文本的传输层实现
-
传输层面向底层的字节缓冲区,操作的是字节流,KafkaChannel 的读写操作会利用传输层操作底层字节缓冲区,从而构造出 NetworkReceive 和 Send 对象
-
Send 字节缓冲区表示要发送出去的数据,NetworkReceive 的 size 缓冲区表示数据的长度,buffer 缓冲区表示数据的内容
-
传输层对 SocketChannel 做了轻量级的封装,和 SocketChannel 一样都实现了 ScatteringByteChannel, GatheringByteChannel
-
TransportLayer 作为 kafkaChannel 的成员变量,Selector 在调用 KafkaChannel read() 和 write() 时,最终会通过 NetworkReceive.readFrom() 和 Send.writeTo() 来调用 TransportLayer 底层的 SocketChannel read() write() 方法
四、Kafka 通道上的读写操作
1. write 调用流程
- NetworkClient.send() -> Selector.send() -> KafkaChannel.setSend() 这时候已经绑定了一个 OP_WRITE 事件,同时将 Send 设置为成员变量
// NetworkClient.send()
public void send(ClientRequest request, long now) {
......
selector.send(request.request());
}
// Selector.send()
public void send(Send send) {
// 获取到 KafkaChannel,往里面存放一个发送请求
KafkaChannel channel = channelOrFail(send.destination());
try {
// setSend 绑定一个 OP_WRITE 事件
channel.setSend(send);
} catch (CancelledKeyException e) {
......
}
}
// KafkaChannel.setSend()
public void setSend(Send send) {
// 往KafkaChannel 里面绑定一个发送出去到请求
this.send = send;
// 往 Selector 上绑定一个 OP_WRITE 事件,绑定之后就可以发送请求了
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
-
Selector.poll() 会轮询监听,监听到 write 事件会调用 KafkaChannel.write() -> KafkaChannel.send() 方法调用 Send.writeTo() 方法,最终使用 TransportLayer.write() 方法调用 SocketChannel.write 方法写数据
-
如果一次没有发送完,Selector.poll() 会循环继续发送,发送完了会取消写操作
// Selector.poll() 里面调用到了 Selector.pollSelectionKeys(),这里是监听读写操作的方法
private void pollSelectionKeys(...) {
......
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
/**
* TODO 处理发送请求事件
* 如果通道已准备好,则写入缓冲区中有空间且有数据的任何套接字
* selector 已经注册了一个 OP_WRITE 事件
*/
if (channel.ready() && key.isWritable()) {
// 获取到要发送的那个网络请求,是这个句代码就往服务端发送数据了
// 消息被发送出去之后,移除 OP_WRITE
Send send = channel.write();
// 已经完成消息发送
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
......
}
// KafkaChannel.write()
public Send write() throws IOException {
Send result = null;
// TODO send 就是发送方法
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
// KafkaChannel.send()
private boolean send(Send send) throws IOException {
// TODO 最终执行发送请求的代码
send.writeTo(transportLayer);
// 如果发送完成,移除 OP_WRITE
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
// Send.writeTo()
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
remaining -= written;
......
return written;
}
// TransportLayer.write()
@Override
public long write(ByteBuffer[] srcs) throws IOException {
return socketChannel.write(srcs);
}
-
一个完整的发送请求和对应的事件监听步骤
-
设置 Send 请求到 KafkaChannel -> 注册写操作 -> 发送请求 -> Send 请求发送完成 -> 取消写操作
2. KafkaChannel write 具体步骤
- 通过 KafkaChannel.setSend() 方法设置要发送的请求对象,并注册写事件
- 客户端轮询到写事件时,会取出 KafkaChannel 中的发送请求,并发送给网络通道
- 如果本次写操作没有全部完成,那么写事件依然存在,客户端还会再次轮询到写事件
- 客户端新的轮询会继续发送请求,发送完成后就取消写事件,并设置返回结果
- 请求发送完成后,加入到 completedSends 集合中,这个数据会被调用者使用
- 请求全部发送完成,send 对象会被重置为空,下一次新的请求才可以继续进行
3. read 调用流程
-
和 write 操作类似,一次没有 read 完,也会调用多次,只有缓冲区数据被填满了,才表示接收到一个完整的 NetworkReceive
-
Selector.poll() 监听到 read 事件,调用 KafkaChannel.read() -> NetworkReceive.readFrom() 调用 TransportLayer.read() 方法
-
完成连接后会注册 OP_READ 事件,可以接收服务端发送回来的响应了
// Selector.poll() 里面调用到了 Selector.pollSelectionKeys(),这里是监听读写操作的方法
private void pollSelectionKeys(...) {
......
/* if channel is ready read from any connections that have readable data */
// 如果通道就绪,则从具有可读数据的任何连接读取
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
// 接收服务端发送回来的响应(请求)
// networkReceive 服务端发送回来的响应
// 不断的读取数据,里面还设计粘包拆包问题
while ((networkReceive = channel.read()) != null)
// 这里迭代保证读取完整的响应,如果没有读取完整,就一直读
addToStagedReceives(channel, networkReceive);
}
......
}
// KafkaChannel.read()
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id);
}
// 一直在读取数据
receive(receive);
// 是否读完一个完整的响应消息
if (receive.complete()) {
receive.payload().rewind();
result = receive;
receive = null;
}
return result;
}
private long receive(NetworkReceive receive) throws IOException {
return receive.readFrom(transportLayer);
}
// NetworkReceive.readFromReadableChannel()
public long readFrom(ScatteringByteChannel channel) throws IOException {
return readFromReadableChannel(channel);
}
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
// 先读取 4字节 数据,代表后面消息体大小,如果还有剩余空间会一直循环读取
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
// 一直读取到没有剩余空间,说明已经读取到一个 4字节 的 int 类型的数
if (!size.hasRemaining()) {
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
// 分配一个内存大小空间,就是刚刚读出来的 4字节 int 大小
this.buffer = ByteBuffer.allocate(receiveSize);
}
}
if (buffer != null) {
// 读取数据
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
- 这里 NetworkReceive.readFrom() 会调用 NetworkReceive.readFromReadableChannel(),这个方法是一个处理粘包拆包问题的方法
4. KafkaChannel read 具体步骤
- 客户端轮询到读事件时,调用 KafkaChannel.read() 方法,如果网络接收对象不存在,就新建一个
- 客户端读取网络通道的数据,并将数据填充到网络连接对象
- 如果本次读操作没有全部完成,客户端还会再次轮询到读事件
- 客户端新的轮询会继续读取,如果读取完成,则返回结果
- 读取完成后,放到暂时完成的列表中,这个数据会被调用者使用
- 读取全部完成,重置网络接收对象为空,下一次新的读取请求才可以继续进行
五、选择器的轮询
-
选择器的轮询根据选择键读写,分别调用 KafkaChannel 的 read() 和 write(),通过不断注册事件、执行事件处理、取消事件,客户端才会发送请求给客户端,并从服务端读取响应结果
-
选择器在轮询到各种事件,要么被提前注册(CONNECT),要么在处理事件的时候被注册(finishConnect() 注册 READ,setSend() 注册 WRITE)
-
都是交给 KafkaChannel 处理,通过底层网络连接通往的就是远程服务端节点,这就完成了服务端和客户端的通信
-
不同的注册事件在选择器的轮询下,会触发不同的事件处理
-
客户端建立连接时注册连接事件
-
发送请求时注册写事件
只有成功连接后,写事件才会被接着选择到
- 连接事件的会确认成功连接,并注册读事件
- 写事件发生时会将请求发送到服务端,接着客户端就开始等待服务端返回响应结果
@Override
public void poll(long timeout) throws IOException {
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
// 从 Selector 上找到有多少个 key 注册了
int readyKeys = select(timeout);
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
// TODO 对 stagedReceives 里面的状态进行处理
// 没有新的选择键,说明要读取的已经都读取完
addToCompletedReceives();
......
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
// 遍历所有的 key
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// 根据 key 找到对应的 KafkaChannel
KafkaChannel channel = channel(key);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
// 完成任何已经完成握手的连接(正常或立即)
// OP_CONNECT
if (isImmediatelyConnected || key.isConnectable()) {
// TODO 最后完成网络连接
// 完成连接会绑定 OP_READ 事件
if (channel.finishConnect()) {
......}
/* if channel is not ready finish prepare */
// 如果通道没有准备好
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
// 如果通道就绪,则从具有可读数据的任何连接读取
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
// 接收服务端发送回来的响应(请求)
// networkReceive 服务端发送回来的响应
// 不断的读取数据,里面还设计粘包拆包问题
while ((networkReceive = channel.read()) != null)
// 这里迭代保证读取完整的响应,如果没有读取完整,就一直读
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
/**
* TODO 处理发送请求事件
* 如果通道已准备好,则写入缓冲区中有空间且有数据的任何套接字
* selector 已经注册了一个 OP_WRITE 事件
*/
if (channel.ready() && key.isWritable()) {
// 获取到要发送的那个网络请求,是这个句代码就往服务端发送数据了
// 消息被发送出去之后,移除 OP_WRITE
Send send = channel.write();
// 已经完成消息发送
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
} catch (Exception e) {
......
}
}
}