Java 基础之 selector

发布于 2024-08-08 12:34:36 字数 17256 浏览 9 评论 0

注意:由于 jdk 对于底层实现的迭代,为了方便描述,本文源码会参考: AdoptOpenJDK/openjdk-jdk11: Mirror of the jdk/jdk11 Mercurial forest at OpenJDK (github.com) ,若无特殊说明均默认使用上面这个。

在我们之前的 socket 一节讲到了我们去 read 一个 inputstream 是阻塞的,那么有没有办法变成非阻塞的而且还最好能直接告诉我那些可读呢?

是可以的,这就是我们本文的重点——java.nio 的非阻塞 IO

非阻塞 IO 的核心在于使用一个 Selector 来管理多个通道,可以是 SocketChannel,也可以是 ServerSocketChannel,将各个通道注册到 Selector 上,指定监听的事件。

之后可以只用一个线程来轮询这个 Selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。我们就完全没有必要给每个通道都起一个线程。

Java 层

实例

还是老样子 写一个 echo 服务器的 demo

Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        //让 selector 监听它的连接到来的事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChannel.bind(new InetSocketAddress(4399));
        while (true){
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isConnectable()){
                    ((SocketChannel) selectionKey.channel()).finishConnect();
                    //留空 这里的意义后面讲
                    continue;
                }
                if (selectionKey.isWritable()) {
                   //留空 这里的意义后面讲
                    continue;
                }
                if (selectionKey.isReadable()) {

                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                    int c = channel.read(buffer);
                    //若读到-1 就是连接断开,这里就不做这么仔细了
                    if (c != -1){
                        buffer.flip();
                        channel.write(buffer);
                    }else {
                        //会取消与其关联的全部 selector 监听事件
                        channel.close();
                    }
                    continue;
                }
                if (selectionKey.isAcceptable()){
                    SocketChannel accept = ((ServerSocketChannel) selectionKey.channel()).accept();
                    //很重要
                    accept.configureBlocking(false);
                    //让 selector 监听它的数据到来的事件
                    accept.register(selector, SelectionKey.OP_READ);
                }
                iterator.remove();
            }
        }

你看完这些代码就可以弄明白到底怎么用了,好,让我们看点细节上面的东西

事件

OP_READ

这个就很好理解了,让 selector 监听这个 socket,若其可以被读取,或者读到末尾(read 返回 0),或者连接断开(read 返回-1),或者读取出现了错误都会触发这个事件,实际上我们大部分情况下只关心可被读取和断开两个情况,其他都是很少发生的边缘条件

OP_ACCEPT

若对应的 ServerSocket 的完成握手队列不为空就会触发这个事件

OP_WRITE

在之前的 socket 章节 我们提到了 socket buffer 若写满了,就会阻塞当前的线程直到可写为止,既然我们现在已经设置为非阻塞了,那我们直接写会阻塞吗?

不会,注意 write 的返回值,其代表了当前写入了多少数据,若为 0 则意味着 socket buffer 满了,这个时候再写实际上不会阻塞但也写不进去,这种时候就需要我们挂载这个事件监听了,等他来通知我们可写,而无需空转不停重试

实际上还有一种情况需要, shutdown(3) - Linux man page (die.net)

OP_CONNECT

实际上这个用于客户端多,首先,在 non-blocking 模式下调用 socketChannel.connect(new InetSocketAddress("127.0.0.1",8080)); 连接远程主机,如果连接能立即建立就像本地连接一样,该方法会立即返回 true ,否则该方法会立即返回 false ,然后系统底层进行三次握手建立连接。连接有两种结果,一种是成功连接,第二种是异常,但是 connect 方法已经返回,无法通过该方法的返回值或者是异常来通知用户程序建立连接的情况,所以由 OP_CONNECT 事件和 finishConnect 方法来通知用户程序。不管系统底层三次连接是否成功, selector 都会被唤醒继而触发 OP_CONNECT 事件,如果握手成功,并且该连接未被其他线程关闭, finishConnect 会返回 true ,然后就可以顺利的进行 channle 读写。如果网络故障,或者远程主机故障,握手不成功,用户程序可以通过 finishConnect 方法获得底层的异常通知,进而处理异常。

一些小细节

传输文件

FileChannel 提到了一个 transferTo 方法 给两个 channel 之间拷贝数据,实际上它也可以用于在网络中传输大块的数据,比如说传输文件

public void sendfile(FileChannel fileChannel,SocketChannel socketChannel) throws IOException {
        fileChannel.transferTo(0, fileChannel.size(), socketChannel);
}

当我们需要在网络中传输一个文件时,为了避免 CPU 拷贝就可以使用这个方法

其内部实现依赖于操作系统对 zero copy 技术的支持。在 unix 操作系统和各种 linux 的版本中,这种功能最终是通过 sendfile() 系统调用实现。

我来给大家证明一下,通过代码追踪可得到最后会触发到 FileChannelImpl 的 transferTo0 方法上面,直接去看对应的 c 实现:

1654179042207

其实就是调用 sendfile64

在内核为 2.4 或者以上版本的 linux 系统上,socket 缓冲区描述符将被用来满足这个需求。这个方式不仅减少了内核用户态间的切换,而且也省去了那次需要 cpu 参与的复制过程。 从用户角度来看依旧是调用 transferTo() 方法,但是其本质发生了变化:

  1. 调用 transferTo 方法后数据被 DMA 从文件复制到了内核的一个缓冲区中。
  2. 数据不再被复制到 socket 关联的缓冲区中了,仅仅是将一个描述符(包含了数据的位置和长度等信息)追加到 socket 关联的缓冲区中。DMA 直接将内核中的缓冲区中的数据传输给协议引擎,消除了仅剩的一次需要 cpu 周期的数据复制。
  3. 1650621923358

wakeup

实际上你会发现 selector::select 会导致线程阻塞起来,若想立刻唤醒对应线程该怎么办?

很简单调用 selector.wakeup 即可,或者直接中断对应阻塞的线程即可。

前者原理非常简单,其实你实例化一个 selector 这个上面实际上预先注册了一个 pipe 的读端的读事件,wakeup 只是向写端写了一个数据,这样就 selector 就捕获到了读端可读这个事件,自然也就停止阻塞返回了,每次 select 都会过滤掉这个读端事件,所以我们看不到

后者原理就是调用 select 时给 Thread 里面 blocker 赋值,相当于注册了一个调用 interrupt() 方法时的回调,这个回调就是调用 wakeup

1654177555794

然后我们看一下精简版本的 wakeup

public Selector wakeup() {
    //省去不必要用于同步的代码
    IOUtil.write1(fd1, (byte)0);      
    return this;
}

socket attachment

如果我们想给 socket 绑定一个对象的话,比如说绑定一个上次没写完的 ByteBuffer,类似于 epoll api 中的 epoll_data 中的 void *ptr

selectionKey.attach(new Object());
Object o = selectionKey.attachment();

看起来和 epoll_data 用法差不多,但是并不是通过这个实现的,思考一下就知道了由于 GC 挪动对象,所以肯定不能用堆外指针指向一个对象。

这个 attach 实际上具有 volatile 语义,所以跨线程的情况下能保证 attachment() 调用的可见性和因果性,这个 attachment 是和 selectionkey 绑定的,也就是说找到了 key 就可以找到这个 attachment

我们来看看 linux 下面是怎么实现的这个功能

1654177407712

就是建立了一个 fd->selectionKey 的映射,epoll 会告知我们触发的是哪个 fd,然我们就可以找到对应的 selectionKey,进而找到对应的 attachment 了,很巧妙。

线程安全

Selector 对于多个并发线程来说是安全的,听起来很不错对吧?

实际上不注意很容易导致死锁,比如说一个线程无限期等待 select 返回,另外一个线程去 register,这就会导致死锁。因为它们会争抢同一个 publishkey 对象作为 monitor(这个结论实际上来自于 jdk8 的源码) JDK-6446653

从 jdk11 来看,不存在这个问题,但是我们不能保证运行我们代码的环境不是 8,虽然现在有向更高迁移的趋势。

正确的思路应该是:

用个生产者消费者模型,把要 register 的 channel 放到队列中,selector 线程在每次 select 前先 register 队列中的 channel 即可,若 selector 线程在阻塞就再加一步 wakeup 即可,实际上 netty 和 jdk11 也是这样实现的。

最佳实践

一个线程一个 selector 用于轮询 fd,而且多个 selector 没有共享的监听 fd。

对于 selector 的 register 和 select 操作必须在同一个线程中,以上的目的在于避免竞态

各个平台的 selector 实现

受限于我看的源码水平,我只了解 linux 和 Windows 的实现

Linux:

Epoll,而且是水平触发模式——即若 socket 接收缓冲区(RCVBUF)不为空那么就会一直触发,这个没法更改是写死在 c 代码里面的,而且由于还是写死的问题,一次 select 出来的 就绪 事件最多是 1024 个。其效率是 O(m) m 为就绪的 fd 数目。

讲到这里,我们再来提一提 epoll 另一个模式——边缘触发,即缓冲区满才触发,若我们一次性没读完,那么下次事件就得等到缓冲区满才触发了。

Windows:

17 之前是基于 select 的,一次性只能监听 1024 个 fd,而且其效率是 O(n),n 为监听的 fd 数量,因为上限只有 1024 个,所以超过的部分会多开线程来监听,每 1024fd 开一个线程监听,每次的 select 方法调用都是统计一下各个线程汇报上来的消息。

17 开始是基于 wepoll 的,其底层是 IOCP——一种 AIO 实现,具体讨论请看 JDK-8266369 ,这种实现性能相较于过去好很多

Linux 的 C 实现

现在我们已经知道了 Linux 上的实现是基于 Epoll 的。

让我们先来看看 Epoll 的手册 The following system calls are provided to create and manage an epoll instance:

  • epoll_create(2) creates a new epoll instance and returns a file descriptor referring to that instance.
  • Interest in particular file descriptors is then registered via epoll_ctl(2), which adds items to the interest list of the epoll instance.
  • epoll_wait(2) waits for I/O events, blocking the calling thread if no events are currently available. (This system call can be thought of as fetching items from the ready list of the epoll instance.)

简单来说就是 epoll_create 创造一个 epoll 实例,epoll_ctl 将 fd 纳入其管理(比如说挂载事件),epoll_wait 等待触发一个事件。我们经常说 java 只是一层很薄的系统调用封装,从这里就可以窥见端倪。

我们先不看实现,思考一下这三个 api 可能是在哪调用的?epoll_ctl->selector::new ,epoll_ctl->socketChannel::register,epoll_wait->select::select

然后再看看代码验证一下

EpollSelectorImpl::new

EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
    //获取 epoll 实例
        this.epfd = EPoll.create();
        this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);

        try {
            //上文已经解释了 这个用于 wakeup 操作 并且给 pipe 设定为非阻塞
            long fds = IOUtil.makePipe(false);
            this.fd0 = (int) (fds >>> 32);
            this.fd1 = (int) fds;
        } catch (IOException ioe) {
            EPoll.freePollArray(pollArrayAddress);
            FileDispatcherImpl.closeIntFD(epfd);
            throw ioe;
        }

        // 将 pipe 的读端挂载读事件
        EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
    }

//Epoll.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_create(JNIEnv *env, jclass clazz) {
    /* size hint not used in modern kernels */
    int epfd = epoll_create(256);
    if (epfd < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

必要的地方已经做了标识,我们讲一讲一些小细节

EPoll.allocatePollArray(NUM_EPOLLEVENTS) 这个实际上就是申请了堆外的一段内存用于存放 epoll_event 的,我们最关心的 NUM_EPOLLEVENTS 这个参数决定了一次 select 最多可以获取到多少就绪事件

private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);
//IOUtil.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_IOUtil_fdLimit(JNIEnv *env, jclass this)
{
    struct rlimit rlp;
    //关注这个调用
    if (getrlimit(RLIMIT_NOFILE, &rlp) < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "getrlimit failed");
        return -1;
    }
    if (rlp.rlim_max == RLIM_INFINITY ||
        rlp.rlim_max > (rlim_t)java_lang_Integer_MAX_VALUE) {
        return java_lang_Integer_MAX_VALUE;
    } else {
        return (jint)rlp.rlim_max;
    }
}

即当前进程允许打开的最大 fd 和 1024 中的最小值,即最大我们一口气可以 select 出 1024 个事件。

EpollSelectorImpl::processUpdateQueue

这个方法并不会直接被我们调用,在 jdk11 中对于 register 操作实际上投递到一个 updateKeys 队列中就返回了,epoll_ctl 并不是在这里调用的。而是在真正 select 操作前会排空这个队列(调用 processUpdateQueue 方法)里面的 SelectionKey,看看注册的事件是不是有更改,有更改就会调用 Epoll::ctl 来注册信息。

在此之前我们先认识一下 SelectionKeyImpl 中的两个字段

private volatile int interestOps;
private int registeredEvents;

当我们修改一个 SelectionKey 的注册事件时实际上是修改的 interestOps

而 registeredEvents 你可以理解为上一次感兴趣的事件(初始为 0)

然后我们将这些概念连接在一起,直接来看一段核心代码。

代码排版炸了,换个图片吧

1654267148632

感兴趣事件对应 code 为 0 那就是不关注它,即对应 epoll_ctl 文档的 EPOLL_CTL_DEL 参数

感兴趣事件对应 code 为 0 且 上一次 感兴趣事件对应 code 为 0,即意味着没有被 epoll 管理(可能是因为获取到这个 fd,也可能是上一个那个情况,被移除监听了),这个时候就加入到 epoll 内

感兴趣事件对应 code 为 0,且 上一次 感兴趣事件对应 code 也 为 0,说明在池内需要修改监听的事件,比如说我之前注册了读事件和写事件,写事件触发,我全写完了需要写的数据,需要取消挂载写事件,这种情况就走这个分支。

EpollSelectorImpl::doSelect

我们调用 selector::select 最终就会走到这个方法里面

还是老样子直接看核心代码

1654268213271

很简单嘛和 epoll_wait 签名一致

int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout)

pollArrayAddress 就是我们在构造器里面申请的那一段堆外内存

EpollSelectorImpl::processEvents

这个方法呢就是 epoll_wait 之后调用的方法用于将 epoll_event 转换为 SelectionKey 的状态。我们直接来看核心代码

long event = EPoll.getEvent(pollArrayAddress, i);
//根据偏移量算触发事件的 fd 的值
int fd = EPoll.getDescriptor(event);
//fd 就是 pipe 读端和实际需要监听的无关 过滤掉
if (fd == fd0) {
    interrupted = true;
} else {
    SelectionKeyImpl ski = fdToKey.get(fd);
    //就是建立了一个 fd->selectionKey 的映射,epoll 会告知我们触发的是哪个 fd,然我们就可以找到对应的 selectionKey
    if (ski != null) {
        //获取实际上注册到 epoll 的事件 值
        int rOps = EPoll.getEvents(event);
        //这个就是将 epoll 对应的事件值转换为 SelectionKey 对应的值
        //这两者是有区别的,后面会讲
        numKeysUpdated += processReadyEvents(rOps, ski, action);
    }
}
// 对应 event 结构体
typedef union epoll_data { 
     void    *ptr;          
     int      fd;           
     uint32_t u32;          
     uint64_t u64;          
 } epoll_data_t;            

 struct epoll_event {       
     uint32_t     events;   
     epoll_data_t data;     
 };

SelectionKey 和 Epoll 的 interestOps 差异是怎么解决的?

最常见的给 Epoll 注册的事件是 POLLIN,其中有连接可以接收和 socket 可以读都是触发这个事件,那么 java 是怎么处理,将其分开来的?

核心就是上面提到的 processReadyEvents 方法,其会转发到对应 fd 关联的 SelectionKey,由这个 SelectKey 来 翻译 epoll 的事件到 SelectionKey 中

而 SelectKey 又关联了其对应的 channel,真正的翻译就会委托到 channel::translateAndUpdateReadyOps 中,由不同的 channel 实现来选择如何更新 SelectKey 的值。很明显 ServerSocketChannel 和 SocketChannel 不是一个类,它们的实现也不同。下面给出两者关于 EPOLLIN 的不同翻译的核心代码

//ServerSocketChannelImpl::translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski)
if (((ops & Net.POLLIN) != 0) &&
    ((intOps & SelectionKey.OP_ACCEPT) != 0))
        newOps |= SelectionKey.OP_ACCEPT;
ski.nioReadyOps(newOps);

//SocketChannelImpl::translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski)
if (((ops & Net.POLLIN) != 0) &&
    ((intOps & SelectionKey.OP_READ) != 0) && connected)
    newOps |= SelectionKey.OP_READ;

一目了然就是这样分开的——利用类的重写机制

你会发现 SocketChannelImpl 对应的实现里面翻译 POLLIN 内有个很有趣的判断

connected ,为什么会有这个?

实际上由于 Net.POLLCONN 和 Net.POLLOUT 在实现上面其实是一个值都是 POLLOUT(来自于 poll.h),为了判别 Connect 事件和 Write 事件才做的,我们来看一段代码

////SocketChannelImpl::translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski)
if (((ops & Net.POLLCONN) != 0) &&
    ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
    newOps |= SelectionKey.OP_CONNECT;
if (((ops & Net.POLLOUT) != 0) &&
    ((intOps & SelectionKey.OP_WRITE) != 0) && connected)
    newOps |= SelectionKey.OP_WRITE;

私货时间——loom 是怎么利用 Selector 的?

以 ServerSocket::accept 为例子:

最后会转发到 NioSocketImpl::accept(SocketImpl si) 上面

try {
    int n = 0;
    //这个 fd 实际上指的就是监听到端口的服务器 fd
    FileDescriptor fd = beginAccept();
    try {
        //类似于 serversocketChannel.confgureBlocking(false)
        configureNonBlockingIfNeeded(fd, remainingNanos > 0);
        if (remainingNanos > 0) {
            // accept with timeout
            n = timedAccept(fd, newfd, isaa, remainingNanos);
        } else {
            // accept, no timeout
            n = Net.accept(fd, newfd, isaa);
            while (IOStatus.okayToRetry(n) && isOpen()) {
                //核心在这里:若此时并没有立刻取到连接 就挂起当前线程
                park(fd, Net.POLLIN);
                n = Net.accept(fd, newfd, isaa);
            }
        }
    } finally {
        endAccept(n > 0);
        assert IOStatus.check(n);
    }
} finally {
    acceptLock.unlock();
}

private void park(FileDescriptor fd, int event, long nanos) throws IOException {
    Thread t = Thread.currentThread();
    if (t.isVirtual()) {
        Poller.poll(fdVal(fd), event, nanos, this::isOpen);
        if (t.isInterrupted()) {
            throw new InterruptedIOException();
        }
    } //省略
}
public static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
    throws IOException
{
    assert nanos >= 0L;
    if (event == Net.POLLIN) {
        readPoller(fdVal).poll(fdVal, nanos, supplier);
    } //省略
}

简单来说 就是将当前的 fd 与线程用一个 Map 关联起来,然后在交由一个特殊的读轮询器线程,在其 selector 上面注册。当这个线程 select 到对应的 fd 时再通过 Lockpark.unpark 对应的虚拟线程让其重新加入调度。

也就是说实际上我们原来需要写的 selector 交由 jdk 实现了,这样阻塞式的 socket api 也可以获得到非阻塞的性能了

解析视频在这里:loom 的 java 层面的实现

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

上一篇:

下一篇:

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

0 文章
0 评论
23 人气
更多

推荐作者

忆伤

文章 0 评论 0

眼泪也成诗

文章 0 评论 0

zangqw

文章 0 评论 0

旧伤慢歌

文章 0 评论 0

qq_GlP2oV

文章 0 评论 0

旧时模样

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文