08、Zookeeper WATCH

watchManager涉及watch机制,内容较多,又要针对watch进行展开了

本节讲解

Watcher相关类简介,类图说明
Watcher的意义,通知状态(keeperState)与事件类型(EventType)
WatchedEvent 和 WatcherEvent 描述zk检测到变化的事件,以及对应用于网络传输的封装类
ClientWatchManager接口以及实现类ZKWatchManager :client端完成根据Event找到需要触发的watches
WatcherSetEventPair 将Event以及对应需要触发的watches集合进行组合绑定

2.简介

UML图如下,红色线代表内部类

 

watcher相关类图

主要的类简介如下:

Watcher,接口类型,其定义了process方法,另外定义内部类Event,再包含内部类KeeperState和EventType来描述Event发生时zk的状态以及对应event类型
WatchedEvent,代表zk上一个Watcher能够回应的变化,包含了变化事件的类型,zk状态以及变化影响的znode的path
WatcherEvent : 是WatchedEvent用于网络传输的封装类
ClientWatchManager:接口,根据Event得到需要通知的watcher
ZKWatchManager为ClientWatchManager的实现

下面进行源码讲解

3.Watcher

Watcher是什么

ZK中引入Watcher机制来实现分布式的通知功能
ZK允许客户端向服务端注册一个Watcher监听,当服务点的的指定事件触发监听时,那么服务端就会向客户端发送事件通知,以便客户端完成逻辑操作(即客户端向服务端注册监听,并将watcher对象存在客户端的Watchermanager中
服务端触发事件后,向客户端发送通知,客户端收到通知后从wacherManager中取出对象来执行回调逻辑)

特性

一次性:一旦一个watcher被触发,ZK都会将其从相应的的存储中移除,所以watcher是需要每注册一次,才可触发一次。
客户端串行执行:客户端watcher回调过程是一个串行同步的过程
轻量:watcher数据结构中只包含:通知状态、事件类型和节点路径

在ZooKeeper中,接口类Watcher定义了事件通知相关的逻辑,包含了KeeperState和EventType两个枚举类,分别代表通知状态和事件类型。

类图如下

 

Watcher类图

简单介绍上面类图就是

Watcher接口拥有process函数,用于处理回调
内部类Event又包含内部类KeeperState以及EventType
KeeperState用于记录Event发生时的zk状态(通知状态)
EventType用于记录Event的类型

3.1方法process

//回调函数实现该函数,表示根据event执行的行为
abstract public void process(WatchedEvent event);

3.2内部类Event

包含KeeperState和EventType两个内部类,通过枚举类实现
方法很简单,就是int值与对应枚举类型的转换
两者的枚举类型以及两者之间的关系,触发条件可以参考《paxos到zk》中的图

 

KeeperState与EventType一览表

4.WatchedEvent 和 WatcherEvent

WatchedEvent :代表zk上一个Watcher能够回应的变化,包含了变化事件的类型,zk状态以及变化影响的znode的path
WatcherEvent : 是WatchedEvent用于网络传输的封装类

WatchedEvent 类图如下

 

WatchedEvent类图

三个成员变量很好的解释了WatchedEvent的意义,即事件的类型,zk状态以及变化影响的znode的path
方法基本都好理解,涉及WatcherEvent 有一个构造方法和一个getWrapper方法
这里稍微强调一下 getWrapper方法

/**
 *  Convert WatchedEvent to type that can be sent over network
 */
//转化成可供网络传输,序列化的WatcherEvent
public WatcherEvent getWrapper() {
    return new WatcherEvent(eventType.getIntValue(), 
                            keeperState.getIntValue(), 
                            path);
}
}

WatcherEvent实现了Record接口,可以理解为WatchedEvent用于网络传输的封装类

5.ClientWatchManager接口和实现类ZKWatchManager

ClientWatchManager接口用户根据Event得到需要通知的watcher
ZKWatchManager为ClientWatchManager的实现

ClientWatchManager接口只有一个函数,源码解析如下

//ClientWatchManager负责根据Event得到需要通知的watcher,该manager本身并不进行通知
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
    Watcher.Event.EventType type, String path);

默认实现类ZKWatchManager,在Zookeeper类中,源码解析如下

private static class ZKWatchManager implements ClientWatchManager {
    private final Map<String, Set<Watcher>> dataWatches =
        new HashMap<String, Set<Watcher>>();//针对内容的watch
    private final Map<String, Set<Watcher>> existWatches =
        new HashMap<String, Set<Watcher>>();//针对exist API相关的watch
    private final Map<String, Set<Watcher>> childWatches =
        new HashMap<String, Set<Watcher>>();//针对getChildren API相关的watch

    private volatile Watcher defaultWatcher;//client传递的,默认的watcher实现

    final private void addTo(Set<Watcher> from, Set<Watcher> to) {
        if (from != null) {
            to.addAll(from);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
     *                                                        Event.EventType, java.lang.String)
     */
    @Override
    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                    Watcher.Event.EventType type,
                                    String clientPath)
    {
        Set<Watcher> result = new HashSet<Watcher>();

        switch (type) {
        case None://eventType是null
            // 则所有dataWatches,existWatches,childWatches都需要被通知,???为什么要这样干
            result.add(defaultWatcher);//添加默认watcher
            boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                    state != Watcher.Event.KeeperState.SyncConnected;//获取clear标记

            synchronized(dataWatches) {
                for(Set<Watcher> ws: dataWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    dataWatches.clear();
                }
            }

            synchronized(existWatches) {
                for(Set<Watcher> ws: existWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    existWatches.clear();
                }
            }

            synchronized(childWatches) {
                for(Set<Watcher> ws: childWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    childWatches.clear();
                }
            }

            return result;
        case NodeDataChanged:
        case NodeCreated:
            //如果节点内容变化或者创建
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);//从dataWatches中移除,并且添加到result中
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);//从existWatches中移除,并且添加到result中
            }
            break;
        case NodeChildrenChanged:
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        case NodeDeleted:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            // XXX This shouldn't be needed, but just in case
            synchronized (existWatches) {
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    addTo(existWatches.remove(clientPath), result);
                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        default://默认处理
            String msg = "Unhandled watch event type " + type
                + " with state " + state + " on path " + clientPath;
            LOG.error(msg);
            throw new RuntimeException(msg);
        }
        //返回结果
        return result;
    }
}

该方法在事件发生后,返回需要被通知的Watcher集合。
是根据已经注册的watches(分为三类,data,children,exist),根据path找到对应的watches,得到一个result集合进行返回
这里留下个疑问

watches的注册是在哪里完成,这个后面再讲
为什么碰到case None,所有watches都要被触发,这个目前不是很理解

6.WatcherSetEventPair

WatcherSetEventPair 将Event以及对应需要触发的watches集合进行组合绑定

这个类在ClientCnxn中,代码很简单

private static class WatcherSetEventPair {
    private final Set<Watcher> watchers;//事件触发需要被通知的watches集合
    private final WatchedEvent event;//事件

    public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
        this.watchers = watchers;
        this.event = event;
    }
}

7.思考

Watcher.Event.KeeperState

这个可以叫成通知状态,也可以理解为事件发生时的zk状态

watcher特性中,"一次性"在client端的体现

ZooKeeper.ZKWatchManager#materialize 中可以看到
被触发的watches从相应的类别(data,children,exist)中删除了,所以在client端是一次性的

为什么需要WatcherSetEventPair 这个类

因为watcher接口process函数需要event参数
那么在ClientWatchManager完成了根据event找到对应的watchers之后
就可以直接调用watcher.process(event)了

但是!!!由于ClientCnxn.EventThread是异步处理的,通过生产消费完成
在processEvent的函数中,要取出一个数据结构Object,既包含watchers集合,又要包含event,所以就把两者组合在一起出现了WatcherSetEventPair 这个类

watcher特性中,"一次性"在server端的体现

在下面几讲WatchManager会讲

ZooKeeper.ZKWatchManager#materialize 里面三个watches的注册是如何完成的

这一块的代码只有三个watches的remove操作
这个在watch机制中会讲

8.问题

ZooKeeper.ZKWatchManager#materialize 为什么碰到case None,所有watches都要被触发

这个目前不是很理解,不知道为什么要这样设计

版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有