Zookeeper 源码分析 - NIOServerCnxnFactory
Zookeeper 可以说是业界最流行的分布式协调解决方案,其源码值得我们好好静下心来学习和研究。
这篇文章主要分析 NIOServerCnxnFactory 这个类。NIOServerCnxnFactory 和 NettyServerCnxnFactory 是 Zookeeper 服务端用来处理连接的核心类,前者基于 NIO,后者基于 Netty 框架。废话少说,让我们一起来看下 NIOServerCnxnFactory 这个类是如何实现的:)。
NIOServerCnxnFactory
NIOServerCnxnFactory 基于 NIO 实现了一个多线程的 ServerCnxnFactory,线程间的通信都是通过 queue 来完成的。NIOServerCnxnFactory 包含的线程如下:
- 1 个 accept 线程,用来监听端口并接收连接,然后把该连接分派给 selector 线程。
- N 个 selecotr 线程,每个 selctor 线程平均负责 1/N 的连接。使用 N 个 selector 线程的原因在于,在大量连接的场景下,select() 操作本身可能会成为性能瓶颈。
- N 个 worker 线程,用来负责 socket 的读写。如果 N 为 0,那么 selector 线程自身会进行 socket 读写。
- 1 个管理连接的线程,用来关闭空闲而且没有建立 session 的连接。
NIOServerCnxnFactory 的启动入口为 startup 方法,如下所示:
public void startup(ZooKeeperServer zks, boolean startServer)
throws IOException, InterruptedException {
//自身的启动逻辑
start();
//设置 zkServer
setZooKeeperServer(zks);
if (startServer) {
//启动 zkServer
zks.startdata();
zks.startup();
}
}
start() 方法包含自身的启动逻辑,而 zks.startdata() 和 zks.startup() 用来启动 zkServer。NIOServerCnxnFactory 是用来管理连接的,而数据处理逻辑则由 zkServer 完成。start() 方法的逻辑如下所示:
public void start() {
stopped = false;
//worker 线程服务,用来进行 socket 的 I/O
if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
//selector 线程,用来监听 socket 事件
for(SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// accept 线程
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
// 连接管理线程
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
可以看到,start 方法主要生成或启动上述的 accept 线程、selector 线程、worker 线程和连接管理线程。
accept 线程
accept 线程的 run() 方法如下:
public void run() { try { //判断是否需要退出 while (!stopped && !acceptSocket.socket().isClosed()) { try { //监听连接事件,并建立连接 select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { //关闭 selector closeSelector(); if (!reconfiguring) { //唤醒 selector 线程并通知 worker 线程关闭 NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } }
accept 线程主要监听连接事件,并建立连接,并分派给 selector。在退出时,关闭它自身的 selector,然后唤醒用来进行 socket I/O 的 selector 线程,最后通知 worker 线程退出。
accept 线程在 select 方法中监听连接事件,然后进入 doAccept() 方法建立连接,分派给 selector 线程,doAccept() 方法如下所示:
private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { //建立连接 sc = acceptSocket.accept(); accepted = true; //防止来自一个 IP 的连接是否过多 InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns ); } LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); //使用轮询来将连接分派给某个 selector 线程 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException( "Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { acceptErrorLogger.rateLimitLog( "Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; }
如代码注释所示,doAccept 方法主要做了两件事:
- 如果某个客户端连接过多则拒绝其建立新连接,防止少量客户端占用所有连接资源。
- 使用轮询来从 N 个 selector 线程中选出一个 selector 线程,并且调用 selectorThread.addAcceptedConnection(sc) 方法来将连接分派给该 selector 线程。调用该方法会把连接扔到这个 selector 线程的 acceptedQueue(类型为 LinkedBlockingQueue)中,然后调用 selector.wakeup() 唤醒 selector 进行处理。
selector 线程
selector 线程的 run 方法如下所示:
public void run() { try { while (!stopped) { try { //监听读写事件并处理 select(); //处理 accept 线程新分派的连接 processAcceptedConnections(); //更新连接监听事件 processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } //...... } finally { closeSelector(); // 唤醒 accept 线程及其他线程,并通知 worker 线程退出 NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } }
可以看到,selector 线程主要做三件事:
- select():监听读写事件并处理;
- processAcceptedConnections():处理 accept 线程新分派的连接;
- processInterestOpsUpdateRequests():更新连接监听事件
其中在 select() 方法中,selector 线程会把有事件发生的连接封装成 IOWorkRequest 对象,然后调用 workerPool.schedule(workRequest) 来交给 worker 线程来处理。
worker 线程
worker 线程的核心处理逻辑在 IOWorkRequest 的 doWork() 中,如下所示:
public void doWork() throws InterruptedException { //如果 Channel 已经关闭则清理该 SelectionKey if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } //如果可读或可写,则调用 NIOServerCnxn.doIO 方法,通知 NIOServerCnxn 连接对象进行 IO 读写及处理 if (key.isReadable() || key.isWritable()) { cnxn.doIO(key); //如果已经 shutdown 则关闭连接 if (stopped) { cnxn.close(); return; } //如果 Channel 已经关闭则清理该 SelectionKey if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } //更新该会话的过期时间 touchCnxn(cnxn); } //已经处理完读写,重新标记该连接已准备好新的 select 事件监听 cnxn.enableSelectable(); //把该连接重新放到 selectThread 的 updateQueue 中,selectThread 会在处理处理完所有 Channel 的读写和新连接后,更新此 Channel 的注册监听事件 if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(); } }
具体逻辑见代码注释。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论