07、Zookeeper 源码解析 - 通信协议

一、序列化和反序列化

ZooKeeper中的序列化和反序列化是通过Record,接口中有两个方法。

public interface Record {
   
     
    //序列化
    void serialize(OutputArchive archive, String tag) throws IOException;
    //反序列化
    void deserialize(InputArchive archive, String tag) throws IOException;
}

ZooKeeper中就是通过这个接口来实现,请求数据的反序列以及文件快照和日志快照的序列化到本地,它具有很多的实现类,如下所示:
 
每一个实现类都是对当前的数据,需要进行相应数据格式的序列化和反序列化的实现,底层是通过DataOutputStream来实现。

二、通信协议

本文将从源码的角度去分析ZooKeeper的通信协议,通信一般法分为请求和响应。

1、请求协议

回到NIOServerCnxn的doIO方法,里边会调用socket的read方法如下:

//传递一个ByteBuffer实例
int rc = sock.read(incomingBuffer);

incomingBuffer就是我们用来读取数据的缓冲区对象,我们来看看其初始化过程:

private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);

protected ByteBuffer incomingBuffer = lenBuffer;

所以,incomingBuffer默认是先读取四个字节的数据。此时会调用readLength(k)方法,这个方法就是读取当前缓冲区中的数据。刚好是一个int类型(int类型是四个字节),这个值表示接下来要读取的数据大小,得到数据大小后会重新按照读到的长度分配incomingBuffer容量,此时我们可以知道请求协议中头四个字节(一个int)是用来保存后续需要发送的数据大小。

接下来就是记性读取剩余的数据,此时分为两步走,一个是建立连接,调用readConnectRequest(),创建和保存session过程,也就是初始化当前会话。接下来就是建立请求调用readRequest()->processPacket():
此时会通过RequestHeader来处理请求头,他是Record接口的一个实现类。查看其反序列化方法:

  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
   
     
    a_.startRecord(tag);
    xid=a_.readInt("xid");
    type=a_.readInt("type");
    a_.endRecord(tag);
}

也就是请求头会包含两个值xid和type都是一个int类型,xid表示当前的会话请求的顺序值,type表示当前的需要执行的类型(create\ls\get等,在ZooDefs的OpCode中可以查看具体的请求类型比如create=1),剩下的就是请求体,每一种请求体都是一个Record的实现类。

所以,请求协议如下:

数据长度 请求头 请求体
四个字节 xid:四个字节,type:四个字节 n个字节

每一类请求体都会有对应的对象进行封装,接下来我们以创建一个节点为例:

在PrepRequestProcessor的pRequestHelper方法中会创建一个 CreateRequest对象,在pRequest2TxnCreate方法中会调用请求体的解析:

ByteBufferInputStream.byteBuffer2Record(request.request, record);

此时的Record就是CreateRequest的实现,我们来看看其反序列化方法:

public void deserialize(InputArchive a_, String tag) throws java.io.IOException {

 
//一个空的实现
a_.startRecord(tag);
//读取节点路径,String类型的读取,先读取当前需要读取的字符串长度,然后再根据长度读取字符串内容
path=a_.readString("path");
//读取节点内容,也是先读取长度再读取内容
data=a_.readBuffer("data");
{
   
     //读取当前的传递过来的权限验证,首先会读取当前的权限认证有多少个
      Index vidx1 = a_.startVector("acl");
      if (vidx1!= null) {
   
               acl=new java.util.ArrayList<org.apache.zookeeper.data.ACL>();
      //循环读取权限值,权限的读取又是通过ACL来实现
          for (; !vidx1.done(); vidx1.incr()) {
   
     
    org.apache.zookeeper.data.ACL e1;
    e1= new org.apache.zookeeper.data.ACL();
    a_.readRecord(e1,"e1");
            acl.add(e1);
          }
      }
    a_.endVector("acl");
    }
    //读取当前的参数值
    flags=a_.readInt("flags");
    a_.endRecord(tag);
}

此时,客户端如果严格按照这样的实现去发送请求就会创建相应的节点。

三、响应协议

服务端处理完请求之后,就会进行响应,这部分简单分析一下服务端响应的一个数据格式。

所有的请求最终都会落到FinalRequestProcessor中进行处理,服务端处理完请求之后,就会创建一个带有Response结尾的响应数据对象(Record),还是以创建节点为例:

服务端处理完请求之后,会创建一个new Create2Response(rc.path, rc.stat)。然后调用NIOServerCnxn实例的sendResponse方法进行返回:

public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {

 
    int responseSize = 0;
    try {

 
    //序列化返回对象
        ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
        responseSize = bb[0].getInt();
        bb[0].rewind();
        //发送响应数据
        sendBuffer(bb);
        decrOutstandingAndCheckThrottle(h);
    } catch (Exception e) {

 
        LOG.warn("Unexpected exception. Destruction averted.", e);
    }
    return responseSize;
}

此时序列化方法就是具体的响应协议的实现,如下所示:

protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
                                 String cacheKey, Stat stat, int opCode) throws IOException {

 
    //序列化响应头,是通过ReplyHeader 实现
    byte[] header = serializeRecord(h);
    //得到响应体
    byte[] data = serializeRecord(r);
    //计算响应体数据长度
    int dataLength = data == null ? 0 : data.length;
    //总的数据长度
    int packetLength = header.length + dataLength;
    //设置当前响应的数据总长度
    ByteBuffer lengthBuffer = ByteBuffer.allocate(4).putInt(packetLength);
    lengthBuffer.rewind();

    int bufferLen = data != null ? 3 : 2;
    ByteBuffer[] buffers = new ByteBuffer[bufferLen];

    buffers[0] = lengthBuffer;
    //写入响应头
    buffers[1] = ByteBuffer.wrap(header);
    if (data != null) {

 
    //写入响应体
        buffers[2] = ByteBuffer.wrap(data);
    }
    return buffers;
}

也就是前四个字节写入当前响应数据的总长度,接下来就是响应头和响应体。
响应头ReplyHeader:

public void serialize(OutputArchive a_, String tag) throws java.io.IOException {

 
a_.startRecord(this,tag);
//四个字节表示当前的xid
a_.writeInt(xid,"xid");
//八个字节表示事务id
a_.writeLong(zxid,"zxid");
//四个字节表示当前处理结果
a_.writeInt(err,"err");
a_.endRecord(this,tag);
}

响应体:

public void serialize(OutputArchive a_, String tag) throws java.io.IOException {

 
a_.startRecord(this,tag);
//写入当前节点的路径
a_.writeString(path,"path");
//写入当前节点的状态值
a_.writeRecord(stat,"stat");
a_.endRecord(this,tag);
}

所以完整的响应协议如下:

数据长度 响应头 响应体
四个字节 xid:四个字节,zxid:八个字节,err:四个字节 n个字节

四、总结

ZooKeeper中通过Record接口实现数据序列化和反序列化过程,每一种事务请求都会有对应的Record的实现,以及相应的响应实现,这两种实现主要是来处理请求体和响应体,在了解完所有的这些实现,就可以自己编写属于自己的客户端api去连接ZooKeeper然后进行操作。

以上,有任何不对的地方,请留言指正,敬请谅解。