Zookeeper 源码分析 - NIOServerCnxnFactory

发布于 2023-11-06 12:45:03 字数 6201 浏览 32 评论 0

Zookeeper 可以说是业界最流行的分布式协调解决方案,其源码值得我们好好静下心来学习和研究。

这篇文章主要分析 NIOServerCnxnFactory 这个类。NIOServerCnxnFactory 和 NettyServerCnxnFactory 是 Zookeeper 服务端用来处理连接的核心类,前者基于 NIO,后者基于 Netty 框架。废话少说,让我们一起来看下 NIOServerCnxnFactory 这个类是如何实现的:)。

NIOServerCnxnFactory

NIOServerCnxnFactory 基于 NIO 实现了一个多线程的 ServerCnxnFactory,线程间的通信都是通过 queue 来完成的。NIOServerCnxnFactory 包含的线程如下:

  • 1 个 accept 线程,用来监听端口并接收连接,然后把该连接分派给 selector 线程。
  • N 个 selecotr 线程,每个 selctor 线程平均负责 1/N 的连接。使用 N 个 selector 线程的原因在于,在大量连接的场景下,select() 操作本身可能会成为性能瓶颈。
  • N 个 worker 线程,用来负责 socket 的读写。如果 N 为 0,那么 selector 线程自身会进行 socket 读写。
  • 1 个管理连接的线程,用来关闭空闲而且没有建立 session 的连接。

NIOServerCnxnFactory 的启动入口为 startup 方法,如下所示:

public void startup(ZooKeeperServer zks, boolean startServer)
        throws IOException, InterruptedException {
    //自身的启动逻辑
    start();
    //设置 zkServer
    setZooKeeperServer(zks);
    if (startServer) {
        //启动 zkServer
        zks.startdata();
        zks.startup();
    }
}

start() 方法包含自身的启动逻辑,而 zks.startdata() 和 zks.startup() 用来启动 zkServer。NIOServerCnxnFactory 是用来管理连接的,而数据处理逻辑则由 zkServer 完成。start() 方法的逻辑如下所示:

public void start() {
    stopped = false;
    //worker 线程服务,用来进行 socket 的 I/O
    if (workerPool == null) {
        workerPool = new WorkerService(
            "NIOWorker", numWorkerThreads, false);
    }
    //selector 线程,用来监听 socket 事件
    for(SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
    // accept 线程
    if (acceptThread.getState() == Thread.State.NEW) {
        acceptThread.start();
    }
    // 连接管理线程
    if (expirerThread.getState() == Thread.State.NEW) {
        expirerThread.start();
    }
}

可以看到,start 方法主要生成或启动上述的 accept 线程、selector 线程、worker 线程和连接管理线程。

accept 线程

accept 线程的 run() 方法如下:

public void run() {
  try {
     //判断是否需要退出
    while (!stopped && !acceptSocket.socket().isClosed()) {
      try {
        //监听连接事件,并建立连接
        select();
      } catch (RuntimeException e) {
        LOG.warn("Ignoring unexpected runtime exception", e);
      } catch (Exception e) {
        LOG.warn("Ignoring unexpected exception", e);
      }
    }
  } finally {
    //关闭 selector
    closeSelector();
  if (!reconfiguring) {
    //唤醒 selector 线程并通知 worker 线程关闭
    NIOServerCnxnFactory.this.stop();
  }
  LOG.info("accept thread exitted run method");
}
}​

accept 线程主要监听连接事件,并建立连接,并分派给 selector。在退出时,关闭它自身的 selector,然后唤醒用来进行 socket I/O 的 selector 线程,最后通知 worker 线程退出。

accept 线程在 select 方法中监听连接事件,然后进入 doAccept() 方法建立连接,分派给 selector 线程,doAccept() 方法如下所示:

private boolean doAccept() {
  boolean accepted = false;
  SocketChannel sc = null;
  try {
    //建立连接
    sc = acceptSocket.accept();
    accepted = true;
    //防止来自一个 IP 的连接是否过多
    InetAddress ia = sc.socket().getInetAddress();
    int cnxncount = getClientCnxnCount(ia);
    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
      throw new IOException("Too many connections from " + ia
                  + " - max is " + maxClientCnxns );
    }
    LOG.info("Accepted socket connection from "
         + sc.socket().getRemoteSocketAddress());
    sc.configureBlocking(false);
  //使用轮询来将连接分派给某个 selector 线程
  if (!selectorIterator.hasNext()) {
    selectorIterator = selectorThreads.iterator();
  }
  SelectorThread selectorThread = selectorIterator.next();
  if (!selectorThread.addAcceptedConnection(sc)) {
    throw new IOException(
      "Unable to add connection to selector queue"
      + (stopped ? " (shutdown in progress)" : ""));
  }
  acceptErrorLogger.flush();
} catch (IOException e) {
  acceptErrorLogger.rateLimitLog(
    "Error accepting new connection: " + e.getMessage());
  fastCloseSock(sc);
}
return accepted;
}​

如代码注释所示,doAccept 方法主要做了两件事:

  • 如果某个客户端连接过多则拒绝其建立新连接,防止少量客户端占用所有连接资源。
  • 使用轮询来从 N 个 selector 线程中选出一个 selector 线程,并且调用 selectorThread.addAcceptedConnection(sc) 方法来将连接分派给该 selector 线程。调用该方法会把连接扔到这个 selector 线程的 acceptedQueue(类型为 LinkedBlockingQueue)中,然后调用 selector.wakeup() 唤醒 selector 进行处理。

selector 线程

selector 线程的 run 方法如下所示:

public void run() {
    try {
        while (!stopped) {
            try {
                //监听读写事件并处理
                select();
            //处理 accept 线程新分派的连接
            processAcceptedConnections();
            //更新连接监听事件
            processInterestOpsUpdateRequests();
        } catch (RuntimeException e) {
            LOG.warn("Ignoring unexpected runtime exception", e);
        } catch (Exception e) {
            LOG.warn("Ignoring unexpected exception", e);
        }
    }
//......
} finally {
    closeSelector();
    // 唤醒 accept 线程及其他线程,并通知 worker 线程退出
    NIOServerCnxnFactory.this.stop();
    LOG.info("selector thread exitted run method");
}
}​

可以看到,selector 线程主要做三件事:

  • select():监听读写事件并处理;
  • processAcceptedConnections():处理 accept 线程新分派的连接;
  • processInterestOpsUpdateRequests():更新连接监听事件

其中在 select() 方法中,selector 线程会把有事件发生的连接封装成 IOWorkRequest 对象,然后调用 workerPool.schedule(workRequest) 来交给 worker 线程来处理。

worker 线程

worker 线程的核心处理逻辑在 IOWorkRequest 的 doWork() 中,如下所示:

public void doWork() throws InterruptedException {
    //如果 Channel 已经关闭则清理该 SelectionKey
    if (!key.isValid()) {
        selectorThread.cleanupSelectionKey(key);
        return;
    }
    //如果可读或可写,则调用 NIOServerCnxn.doIO 方法,通知 NIOServerCnxn 连接对象进行 IO 读写及处理
    if (key.isReadable() || key.isWritable()) {
        cnxn.doIO(key);
    //如果已经 shutdown 则关闭连接
    if (stopped) {
        cnxn.close();
        return;
    }
    //如果 Channel 已经关闭则清理该 SelectionKey
    if (!key.isValid()) {
        selectorThread.cleanupSelectionKey(key);
        return;
    }
    //更新该会话的过期时间
    touchCnxn(cnxn);
}
//已经处理完读写,重新标记该连接已准备好新的 select 事件监听
cnxn.enableSelectable();
//把该连接重新放到 selectThread 的 updateQueue 中,selectThread 会在处理处理完所有 Channel 的读写和新连接后,更新此 Channel 的注册监听事件
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
    cnxn.close();
}
}​

具体逻辑见代码注释。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

柳若烟

暂无简介

0 文章
0 评论
613 人气
更多

推荐作者

内心激荡

文章 0 评论 0

JSmiles

文章 0 评论 0

左秋

文章 0 评论 0

迪街小绵羊

文章 0 评论 0

瞳孔里扚悲伤

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文