ZkClient 事件监听器的疑问

发布于 2022-03-07 17:21:14 字数 1856 浏览 724 评论 1

public static void main(String[] args) throws Exception {
	ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
	
	zkClient.createEphemeral("/zkclient");
	zkClient.subscribeDataChanges("/zkclient", new IZkDataListener() {			
		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			System.out.println(String.format("The node '%s' is deleted.", dataPath));
		}
		@Override
		public void handleDataChange(String dataPath, Object data) throws Exception {
			System.out.println(String.format("The node '%s' is changed, now its data is '%s'.", dataPath, data));
		}
	});
	
	zkClient.writeData("/zkclient", "hello world");
	Thread.sleep(1000);
	zkClient.delete("/zkclient");
	Thread.sleep(1000);
	
	zkClient.close();
}

这段代码输出的内容是:

The node '/zkclient' is changed, now its data is 'hello world'.
The node '/zkclient' is deleted.

 

public static void main(String[] args) throws Exception {
	ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
	
	zkClient.createEphemeral("/zkclient");
	zkClient.subscribeDataChanges("/zkclient", new IZkDataListener() {			
		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			System.out.println(String.format("The node '%s' is deleted.", dataPath));
		}
		@Override
		public void handleDataChange(String dataPath, Object data) throws Exception {
			System.out.println(String.format("The node '%s' is changed, now its data is '%s'.", dataPath, data));
		}
	});
	
	zkClient.writeData("/zkclient", "hello world");
	zkClient.delete("/zkclient");
	Thread.sleep(1000);
	
	zkClient.close();
}

这段代码的输出内容却是:

The node '/zkclient' is deleted.
The node '/zkclient' is deleted.

为什么会接收到两次删除节点事件?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

苍暮颜 2022-03-10 21:10:08

两年多前的问题,今天玩zk的时候我也遇到了。跟了下源码,发现ZkClient里通知线程里处理逻辑是这样的:

public void process(WatchedEvent event) {
    LOG.debug("Received event: " + event);
    _zookeeperEventThread = Thread.currentThread();

    boolean stateChanged = event.getPath() == null;
    boolean znodeChanged = event.getPath() != null;
    boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
            || event.getType() == EventType.NodeChildrenChanged;

    getEventLock().lock();
    try {

        // We might have to install child change event listener if a new node was created
        if (getShutdownTrigger()) {
            LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
            return;
        }
        if (stateChanged) {
            processStateChanged(event);
        }
        if (dataChanged) {
            processDataOrChildChange(event);
        }
    } finally {
        if (stateChanged) {
            getEventLock().getStateChangedCondition().signalAll();

            // If the session expired we have to signal all conditions, because watches might have been removed and
            // there is no guarantee that those
            // conditions will be signaled at all after an Expired event
            // TODO PVo write a test for this
            if (event.getState() == KeeperState.Expired) {
                getEventLock().getZNodeEventCondition().signalAll();
                getEventLock().getDataChangedCondition().signalAll();
                // We also have to notify all listeners that something might have changed
                fireAllEvents();
            }
        }
        if (znodeChanged) {
            getEventLock().getZNodeEventCondition().signalAll();
        }
        if (dataChanged) {
            getEventLock().getDataChangedCondition().signalAll();
        }
        getEventLock().unlock();
        LOG.debug("Leaving process event");
    }
}

其中,不管是数据变化还是节点被删除,都会进到下面这个方法

 

if (dataChanged) {
    processDataOrChildChange(event);
}

processDataOrChildChange方法:

private void processDataOrChildChange(WatchedEvent event) {
    final String path = event.getPath();

    if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) {
        Set<IZkChildListener> childListeners = _childListener.get(path);
        if (childListeners != null && !childListeners.isEmpty()) {
            fireChildChangedEvents(path, childListeners);
        }
    }

    if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
        Set<IZkDataListener> listeners = _dataListener.get(path);
        if (listeners != null && !listeners.isEmpty()) {
            fireDataChangedEvents(event.getPath(), listeners);
        }
    }
}

会进到fireDataChangedEvents方法:

private void fireDataChangedEvents(final String path, Set<IZkDataListener> listeners) {
    for (final IZkDataListener listener : listeners) {
        _eventThread.send(new ZkEvent("Data of " + path + " changed sent to " + listener) {

            @Override
            public void run() throws Exception {
                // reinstall watch
                exists(path, true);
                try {
                    Object data = readData(path, null, true);
                    listener.handleDataChange(path, data);
                } catch (ZkNoNodeException e) {
                    listener.handleDataDeleted(path);
                }
            }
        });
    }
}

fireDataChangedEvents方法可以看出,是先处理handleDataChange,如果这个节点不存在了,就当做handleDataDeleted事件处理了。代码跟到这原因大概就清晰了。

因为ZK的watch机制是异步的,修改数据的时候,触发了一次fireDataChangedEvents,但是在通知线程里,代码还没走到handleDataChange方法的时候,main方法的主线程把节点删除了,所以,等走到handleDataChange方法的时候,节点已经不存在了,就会报异常,从而触发了handleDataDeleted方法。

总结:原因就是,代码执行顺序是这样的:1、主线程先对节点做修改数据操作,导致通知线程会收到一个dataChangedEvent通知,2、紧接着,主线程又把节点删除了,导致通知线程又会收到一个dataChangedEvent通知。3、通知线程会按顺序处理dataChangedEvent通知,在处理dataChangedEvent通知的时候,该节点已经在步骤2中被删除了,导致抛异常,从而触发handleDataDeleted方法调用,第二个本身也是会触发handleDataDeleted方法调用,所以看到的就是收到两个handleDataDeleted调用结果。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文