Netty 源码分析

发布于 2024-01-01 22:44:41 字数 52287 浏览 53 评论 0

一. 服务端启动流程

NIO 原生服务端启动代码:

//1. 创建 Selector,管理多个 Channel
Selector selector = Selector.open();

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
// 2. 建立 ServerSocketChannel 与 Selector 的联系。SelectionKey 用于事件发生后,通过它可以知道是哪个 channel 的事件,以及事件类型
SelectionKey sscSelectionKey = serverSocketChannel.register(selector, 0, null);
// 3. 关注 accept 事件
sscSelectionKey.interestOps(SelectionKey.OP_ACCEPT);

Netty 服务端启动代码:

        // 1、启动器,负责装配 netty 组件,启动服务器
        new ServerBootstrap()
                // 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
                .group(new NioEventLoopGroup())
                // 3、选择服务器的 ServerSocketChannel 实现
                .channel(NioServerSocketChannel.class)
            	// 4、绑定端口,并启动服务端
                .bind(8000);

Netty 服务端启动流程本质上就是对 NIO 基础 API 的封装。可以将 Netty 启动流程简化成如下代码:

// netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open(); 

// 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();

// 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
serverSocketChannel.configureBlocking(false);

// 启动 nio boss 线程执行接下来的操作

//注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

// head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));

// 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  • 获得选择器 Selector,Netty 中使用 NioEventloopGroup 中的 NioEventloop 封装了线程和选择器
  • 创建 NioServerSocketChannel ,该 Channel作为附件添加到 ServerSocketChannel
  • 创建 ServerSocketChannel ,将其设置为非阻塞模式,并注册到 Selector 中,此时未关注事件,但是添加了附件NioServerSocketChannel
  • 绑定端口
  • 通过 interestOps 设置感兴趣的事件

1.1 bind

选择器 Selector 的创建是在 NioEventloopGroup 中完成的。NioServerSocketChannel 与 ServerSocketChannel 的创建、ServerSocketChannel 注册到 Selector 中以及绑定操作都是由 bind 方法完成的 所以服务器启动的入口便是 io.netty.bootstrap.ServerBootstrap.bind

public ChannelFuture bind(SocketAddress localAddress) {
	validate();
	return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

1.2 doBind

真正完成初始化、注册以及绑定的方法是 io.netty.bootstrap.AbstractBootstrap.doBind dobind 方法在主线程中执行

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 负责 NioServerSocketChannel 和 ServerSocketChannel 的创建
    // ServerSocketChannel 的注册工作
    // init 由 main 线程完成,regisetr 由 NIO 线程完成
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 因为 register 操作是异步的
    // 所以要判断主线程执行到这里时,register 操作是否已经执行完毕
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        
        // 执行 doBind0 绑定操作
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        // 如果 register 操作还没执行完,就会到这个分支中来
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        
        // 添加监听器,NIO 线程异步进行 doBind0 操作
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See   https://github.com/netty/netty/issues/2586  
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
  • doBind() 中有两个重要方法 initAndRegister()doBind0(regFuture, channel, localAddress, promise)
  • initAndRegister主要负责 NioServerSocketChannel 和 ServerSocketChannel 的创建(主线程中完成)与 ServerSocketChannel 注册(NIO 线程中完成)工作
  • doBind0则负责连接的创建工作

1.3 initAndRegisterd

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

1.3.1 newChannel

// io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
    try {
        // 通过反射调用 NioServerSocketChannel 的构造方法
        // 创建 NioServerSocketChannel 对象
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

在内部会通过反射调用 NioServerSocketChannel 的构造方法,创建 NioServerSocketChannel 对象:

public NioServerSocketChannel() {
    // 创建了 ServerSocketChannel 实例
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

在创建 NioServerSocketChannel 时,会在构造方法里面同步创建 NIO 原生的 ServerSocketChannel

1.3.2 init

@Override
void init(Channel channel) {
   	...
		
    // NioServerSocketChannel 的 Pipeline    
    ChannelPipeline p = channel.pipeline();
		
    ...

    // 向 Pipeline 中添加了一个 handler,该 handler 等待被调用
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        // register 之后才调用该方法
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            
            // 创建 handler 并加入到 pipeline 中
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 添加新的 handler,在发生 Accept 事件后建立连接
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
  • 该方法向 NioServerSocketChannel 中添加了两个 handler,添加操作在 register 之后被执行
    • 一个 handler 负责设置配置
    • 一个 handler 负责发生 Accepet 事件后建立连接

1.3.3 register

init 执行完毕后,便执行 ChannelFuture regFuture = config().group().register(channel) 操作 该方法最终调用的是 promise.channel().unsafe().register(this, promise) 方法 promise.channel().unsafe().register(this, promise)

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...

    // 获取 EventLoop
    AbstractChannel.this.eventLoop = eventLoop;

   	// 此处完成了由 主线程 到 NIO 线程 的切换
    // eventLoop.inEventLoop() 用于判断当前线程是否为 NIO 线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 向 NIO 线程中添加任务
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 该方法中会执行 doRegister
                    // 执行真正的注册操作
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           ...
        }
    }
}

register0 方法

//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
    try {
       	...
            
        // 执行真正的注册操作
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        
        // 调用 init 中的 initChannel 方法
        pipeline.invokeHandlerAddedIfNeeded();

        ...
    } catch (Throwable t) {
        ...
    }
}

doRegister 方法

// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // javaChannel() 即为 ServerSocketChannel
            // eventLoop().unwrappedSelector() 获取 eventLoop 中的 Selector
            // this 为 NIOServerSocketChannel,作为附件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            ...
           
        }
    }
}

回调 initChannel

//io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) throws Exception {
    ...

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
		    // 添加新任务,任务负责添加 handler
			// 该 handler 负责发生 Accepet 事件后建立连接
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

Register 主要完成了以下三个操作

  • 完成了主线程到 NIO 的线程切换
    • 通过 eventLoop.inEventLoop() 进行线程判断,判断当前线程是否为 NIO 线程
    • 切换的方式为让 eventLoop 执行 register 的操作
    • register 的操作在 NIO 线程中完成
  • 调用 doRegister 方法
    // javaChannel() 即为 ServerSocketChannel
    // eventLoop().unwrappedSelector() 获取 eventLoop 中的 Selector
    // this 为 NIOServerSocketChannel,作为附件
    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    
    • 将 ServerSocketChannel 注册到 EventLoop 的 Selector 中
    • 此时还未关注事件
    • 添加 NioServerSocketChannel 附件
  • 通过 invokeHandlerAddedIfNeeded 调用 init 中的 initChannel 方法
    • initChannel 方法主要创建了 两个 handler
      • 一个 handler 负责设置配置
      • 一个 handler 负责发生 Accept 事件后建立连接

1.4 doBind0

1.4.1 绑定端口

doRegisterinvokeHandlerAddedIfNeeded 操作中的完成后,会调用 safeSetSuccess(promise) 方法,向 Promise 中设置执行成功的结果。此时 doBind 方法中由 initAndRegister 返回的 ChannelFuture 对象 regFuture 便会由 NIO 线程异步执行 doBind0 绑定操作

//io.netty.bootstrap.AbstractBootstrap#doBind
// initAndRegister 为异步方法,会返回 ChannelFuture 对象
final ChannelFuture regFuture = initAndRegister();
regFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
            // IllegalStateException once we try to access the EventLoop of the Channel.
            promise.setFailure(cause);
        } else {
            // Registration was successful, so set the correct executor to use.
            // See   https://github.com/netty/netty/issues/2586  
            promise.registered();
            // 如果没有异常,则执行绑定操作
            doBind0(regFuture, channel, localAddress, promise);
        }
    }
});

doBind0 最底层调用的是 ServerSocketChannel 的 bind 方法 NioServerSocketChannel.doBind 方法通过该方法,绑定了对应的端口。

// io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        // 调用 ServerSocketChannel 的 bind 方法,绑定端口
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

1.4.2 关注事件

在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加 ServerSocketChannel 感兴趣的事件

if (!wasActive && isActive()) {
    invokeLater(new Runnable() {
        @Override
        public void run() {
            pipeline.fireChannelActive();
        }
    });
}

最终在 AbstractNioChannel.doBeginRead 方法中,会添加 ServerSocketChannel 添加 Accept 事件

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    // 如果 ServerSocketChannel 没有关注 Accept 事件
    if ((interestOps & readInterestOp) == 0) {
        // 则让其关注 Accepet 事件
        // readInterestOp 取值是 16
        // 在 NioServerSocketChannel 创建时初始化
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

注意:此处设置 interestOps 时使用的方法,避免覆盖关注的其他事件

  • 首先获取 Channel 所有感兴趣的事件
    final int interestOps = selectionKey.interestOps();
    
  • 然后再设置其感兴趣的事件
    selectionKey.interestOps(interestOps | readInterestOp);
    

1.5 总结

通过上述步骤,完成了

  • NioServerSocketChannel 与 ServerSocketChannel 的创建
  • ServerSocketChannel 绑定到 EventLoop 的 Selecot 中,并添加 NioServerSocketChannel 附件
  • 绑定了对应的端口
  • 关注了 Accept 事件

二. NioEventLoop 剖析

2.1 组成

NioEventLoop 的重要组成部分有三个

  • Selector
    public final class NioEventLoop extends SingleThreadEventLoop {
        
        ...
            
        // selector 中的 selectedKeys 是基于数组的
        // unwrappedSelector 中的 selectedKeys 是基于 HashSet 的    
        private Selector selector;
        private Selector unwrappedSelector;
        private SelectedSelectionKeySet selectedKeys;
        
        ...
    }
    
  • Thread 与 TaskQueue
    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        // 任务队列
        private final Queue<Runnable> taskQueue;
    
        // 线程
        private volatile Thread thread;
    }
    

2.1.1 Selector 的创建

Selector 是在 NioEventLoop 的构造方法中被创建的

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
    
        ...
           
        // 初始化 selector,初始化过程在 openSelector 中
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
}


private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        // 此处等同于 Selector.open() 方法
        // 创建了 unwrappedSelector 对象
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
}

NioEventLoop 的构造方法中,调用了 openSelector() 方法, 该方法会返回一个 SelectorTuple对象,该方法是创建 Selector 的核心方法openSelector() 方法内部调用了

unwrappedSelector = provider.openSelector();

获得了 Selector 对象 unwrappedSelector 后面会通过反射,修改 unwrappedSelector 中 SelectedKeys 的实现,然后构造一个 SelectedSelectionKeySetSelector 包装类,包装原始的 unwrappedSelector。最后通过 SelectorTuple 的构造方法,将该 Selector 的值赋给 SelectorTuple 类中的 selector 与 unwrappedSelector

private static final class SelectorTuple {
    final Selector unwrappedSelector;
    final Selector selector;

    SelectorTuple(Selector unwrappedSelector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = unwrappedSelector;
    }

    /**
    * 一般调用的是这个构造方法
    */
    SelectorTuple(Selector unwrappedSelector, Selector selector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = selector;
    }
}

再通过 NioEventLoop 的构造方法,将 SelectorTuple 中的 Selector 赋值给 NioEventLoop 中的 Selector

2.1.2 两个 Selector 的区别

NioEventLoop 中有 selector 和 unwrappedSelector 两个 Selector,它们的区别主要在于 SelectedKeys 的数据结构

  • selector 中的 SelectedKeys 是基于数组
  • unwrappedSelector 中的是基于 HashSet

这样做的主要目的是,数组的遍历效率要高于 HashSet

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    ...
    
    // 获得基于数组的 selectedKeySet 实现
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();


    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 通过反射拿到 unwrappedSelector 中的 selectedKeys 属性
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                ...
	
                // 暴力反射,修改私有属性
                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }

                // 替换为基于数组的 selectedKeys 实现
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    selectedKeys = selectedKeySet;
    
    // 调用构造函数,创建 unwrappedSelector 与 selector
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

获得数组实现 SelectedKeys 的 Selector 的原理是反射,主要步骤如下

  • 获得基于数组的 selectedKeySet 实现
    // 获得基于数组的 selectedKeySet 实现
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
    SelectedSelectionKeySet() {
    	keys = new SelectionKey[1024];
    }
    
  • 通过反射拿到 unwrappedSelector 中的 SelectedKeySet 并将其替换为 selectedKeySet
  • 通过 Selector 的构造方法获得 selector
    new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet);
    
  • 通过 SelectorTuple 的构造方法获得拥有两种 Selector 的 SelectorTuple 对象,并返回给 NioEventLoop
    // 调用构造函数,创建 unwrappedSelector 与 selector
    return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    

2.2 Nio 线程的启动机制

2.2.1 启动

NioEventLoop 中的线程,在首次执行任务时,才会被创建,且只会被创建一次 测试代码

public class TestNioEventLoop {
    public static void main(String[] args) {
        EventLoop eventLoop = new NioEventLoopGroup().next();
        // 使用 NioEventLoop 执行任务
        eventLoop.execute(()->{
            System.out.println("hello");
        });
    }
}

进入 execute 执行任务

@Override
public void execute(Runnable task) {
    // 检测传入的任务是否为空,为空会抛出 NullPointerException
    ObjectUtil.checkNotNull(task, "task");
    // 执行任务
    // 此处判断了任务是否为懒加载任务,wakesUpForTask 的返回值只会为 true
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

进入上述代码的 execute 方法

private void execute(Runnable task, boolean immediate) {
    // 判断当前线程是否为 NIO 线程
    // 判断方法为 return thread == this.thread;
    // this.thread 即为 NIO 线程,首次执行任务时,其为 null
    boolean inEventLoop = inEventLoop();
    
    // 向任务队列 taskQueue 中添加任务
    addTask(task);
    
    // 当前线程不是 NIO 线程,则进入 if 语句
    if (!inEventLoop) {
        // 启动 NIO 线程的核心方法
        startThread();
        
        ...
        
    }
	
    // 有任务需要被执行时,唤醒阻塞的 NIO 线程
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

进入 startThread 方法

private void startThread() {
    // 查看 NIO 线程状态是否为未启动
    // 该 if 代码块只会执行一次
    // state 一开始的值就是 ST_NOT_STARTED
    // private volatile int state = ST_NOT_STARTED;
    if (state == ST_NOT_STARTED) {
        // 通过原子属性更新器将状态更新为启动(ST_STARTED)
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 执行启动线程
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

进入 doStartThread ,真正创建 NIO 线程并执行任务

private void doStartThread() {
    assert thread == null;
    // 创建 NIO 线程并执行任务
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // thread 即为 NIO 线程
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 执行内部 run 方法
                SingleThreadEventExecutor.this.run();
                success = true;
            } 
            
            ...
    });
}

通过 SingleThreadEventExecutor.this.run() 执行传入的任务(task) 该 run 方法是NioEvnetLoop 的 run 方法

//io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
    int selectCnt = 0;
    // 死循环,不断地从任务队列中获取各种任务来执行
    for (;;) {	
      	// 执行各种任务
   		try {
            int strategy;
            try {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
       		}
    	}
	}

2.2.2 唤醒

NioEvnetLoop 需要 IO 事件、普通任务以及定时任务,任务在 run 方法的 for 循环中

@Override
protected void run() {
    int selectCnt = 0;
    // 死循环,不断地从任务队列中获取各种任务来执行
    for (;;) {
      	// 执行各种任务
   		...
    }
}

中被执行,但该循环不会空转,执行到某些代码时,会被阻塞 run 方法中有 SELECT 分支

case SelectStrategy.SELECT:
	long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
	if (curDeadlineNanos == -1L) {
        curDeadlineNanos = NONE; // nothing on the calendar
    }
	nextWakeupNanos.set(curDeadlineNanos);
	try {
    	if (!hasTasks()) {
            // 执行 select 方法
            strategy = select(curDeadlineNanos);
        }
    }
	...

会执行 NioEvnetLoop 的 select 方法,该方法内部会根据情况,执行 selector 的有参和无参的 select 方法

private int select(long deadlineNanos) throws IOException {
    // 如果没有指定阻塞事件,就调用 select()
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    // 否则调用 select(timeoutMillis),指定时间内未发生事件就停止阻塞
    // Timeout will only be 0 if deadline is within 5 microsecs
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

但需要注意的是,select 方法是会阻塞线程的,当没有 IO 事件,但有其他任务需要执行时,需要唤醒线程 唤醒是通过 execute 最后的 if 代码块来完成的

// 有任务需要被执行时,唤醒阻塞的 NIO 线程
if (!addTaskWakesUp && immediate) {
    wakeup(inEventLoop);
}

NioEventLoop.wakeup 唤醒被 selector.select 方法阻塞的 NIO 线程

@Override
protected void wakeup(boolean inEventLoop) {
    // 只有当其他线程给当前 NIO 线程提交任务时(如执行 execute),才会被唤醒
    // 通过 AtomicLong 进行更新,保证每次只能有一个线程唤醒成功
    if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
        // 唤醒被 selector.select 方法阻塞的 NIO 线程
        selector.wakeup();
    }
}

唤醒时需要进行两个判断

  • 判断提交任务的是否为 NIO 线程
    • 若是其他线程,才能唤醒 NIO 线程
    • 若是 NIO 线程自己,则不能唤醒
  • 通过AtomicLong保证有多个线程同时提交任务时,只有一个线程能够唤醒 NIO 线程

2.2.3 SELECT 分支

run 方法的 switch 语句有多条分支,具体执行分支的代码由 strategy 变量控制

int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
	...
}

strategy 的值由 calculateStrategy 方法确定

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    // selectSupplier.get() 底层是 selector.selectNow();
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

该方法会根据 hasTaks 变量判断任务队列中是否有任务

  • 若有任务,则通过 selectSupplier 获得 strategy 的值
    • get 方法会 selectNow 方法,顺便拿到 IO 事件
      private final IntSupplier selectNowSupplier = new IntSupplier() {
          public int get() throws Exception {
              return NioEventLoop.this.selectNow();
          }
      };
      
      int selectNow() throws IOException {
          return this.selector.selectNow();
      }
      
  • 若没有任务,就会进入 SELECT 分支

也就说,当任务队列中没有任务时,才会进入 SELECT 分支,让 NIO 线程阻塞,而不是空转。若有任务,则会通过 get 方法调用 selector.selectNow 方法,顺便拿到 IO 事件

2.3 JDK 原生 NIO 空轮询 BUG

JDK NIO 空轮询 BUG 也就是 JDK NIO 在 Linux 系统下的 epoll 空轮询问题 在 NioEventLoop 中,因为 run 方法中存在一个死循环,需要通过 selector.select 方法来阻塞线程。但是 select 方法因为 BUG,可能无法阻塞线程,导致循环一直执行,使得 CPU 负载打满

@Override
protected void run() {
    ...
    for(;;){
        ...
        // 可能发生空轮询,无法阻塞 NIO 线程
        strategy = select(curDeadlineNanos);  
        ...     
    
     	if(...) {
			...
     	} else if (unexpectedSelectorWakeup(selectCnt) ){
            // 通过 unexpectedSelectorWakeup 方法中的 rebuildSelector 重建 selector
            // 并将 selectCnt 重置为 0
            selectCnt = 0;
        }
	}
}

Netty 中通过 selectCnt 变量来检测 select 方法是否发生空轮询 BUG 若发生空轮询 BUG,那么 selectCnt 的值会增长是十分迅速。当 selectCnt 的值大于等于 SELECTOR_AUTO_REBUILD_THRESHOLD (默认 512)时,Netty 则判断其出现了空轮询 BUG,进行如下处理

// io.netty.channel.nio.NioEventLoop#select
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    // The selector returned prematurely many times in a row.
    // Rebuild the selector to work around the problem.
    logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);
    // 重建 selector,将原 selector 的配置信息传给新 selector
    // 再用新 selector 覆盖旧 selector
    rebuildSelector();
    return true;
}

通过 rebuildSelector 方法重建 selector,将原 selector 的配置信息传给新 selector,再用新 selector 覆盖旧 selector。同时将 selectCnt 的值设置为 0

2.4 IORatio

NioEventLoop 可以处理 IO 事件和其他任务。不同的操作所耗费的时间是不同的,想要控制 NioEventLoop 处理 IO 事件花费时间占执行所有操作的总时间的比例,需要通过 ioRatio 来控制 NioEventLoop.run 方法

// 处理 IO 事件时间比例,默认为 50%
final int ioRatio = this.ioRatio;

// 如果 IO 事件时间比例设置为 100%
if (ioRatio == 100) {
    try {
        // 如果需要去处理 IO 事件
        if (strategy > 0) {
            // 先处理 IO 事件
            processSelectedKeys();
        }
    } finally {
        // Ensure we always run tasks.
        // 剩下的时间都去处理普通任务和定时任务
        ranTasks = runAllTasks();
    }
} else if (strategy > 0) { // 如果需要去处理 IO 事件
    // 记录处理 IO 事件前的时间
    final long ioStartTime = System.nanoTime();
    try {
        // 去处理 IO 事件
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        // ioTime 为处理 IO 事件耗费的事件
        final long ioTime = System.nanoTime() - ioStartTime;
        // 计算出处理其他任务的事件
        // 超过设定的时间后,将会停止任务的执行,会在下一次循环中再继续执行
        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
} else { // 没有 IO 事件需要处理
    // This will run the minimum number of tasks
    // 直接处理普通和定时任务
    ranTasks = runAllTasks(0); 
}

通过 ioRatio 控制各个任务执行的过程如下

  • 判断 ioRatio 是否为 100
    • 若是,判断是否需要处理 IO 事件(strategy>0)
      • 若需要处理 IO 事件,则先处理 IO 事件
    • 若否(或 IO 事件已经处理完毕),接下来去执行所有的普通任务和定时任务,直到所有任务都被处理完
      // 没有指定执行任务的时间
      ranTasks = runAllTasks();
      
  • 若 ioRatio 不为 100
    • 先去处理 IO 事件,记录处理 IO 事件所花费的事件保存在 ioTime 中
    • 接下来去处理其他任务,根据 ioTime 与 ioRatio 计算执行其他任务可用的时间
      // 比如 ioTime 为 10s,ioRatio 为 50
      // 那么通过 10*(100-50)/50=10 计算出其他任务可用的时间为 10s
      // 处理 IO 事件占用的事件总比例为 50%
      ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
      
    • 执行其他任务一旦超过可用时间,则会停止执行,在下一次循环中再继续执行
  • 若没有 IO 事件需要处理,则去执行最少数量的普通任务和定时任务
    // 运行最少数量的任务
    ranTasks = runAllTasks(0);
    

但是此机制无法避免单个任务执行时间过长导致的线程阻塞,因为 runAllTasks 是在每执行一个任务后判断是否超时,如果超时就 break 返回。

2.5 处理事件

IO 事件是通过 NioEventLoop.processSelectedKeys() 方法处理的

private void processSelectedKeys() {
    // 如果 selectedKeys 是基于数组的
    // 一般情况下都走这个分支
    if (selectedKeys != null) {
        // 处理各种 IO 事件
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

processSelectedKeysOptimized 方法

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        // 拿到 SelectionKeyec
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See   https://github.com/netty/netty/issues/2363  
        selectedKeys.keys[i] = null;

        // 获取 SelectionKey 上的附件,即 NioServerSocketChannel
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 处理事件,传入附件 NioServerSocketChannel
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See   https://github.com/netty/netty/issues/2363  
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

该方法中通过 fori 的方法,遍历基于数组的 SelectedKey,通过

final SelectionKey k = selectedKeys.keys[i];

获取到 SelectionKey,然后获取其再 Register 时添加的附件 NioServerSocketChannel

// 获取 SelectionKey 上的附件,即 NioServerSocketChannel
final Object a = k.attachment();

如果附件继承自 AbstractNioChannel,则会调用

// 处理事件,传入附件 NioServerSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);

去处理各个事件 真正处理各种事件的方法 processSelectedKey 获取 SelectionKey 的事件,然后进行相应处理

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See   https://github.com/netty/netty/issues/5125  
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See   https://github.com/netty/netty/issues/924  
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

三. Accept 事件剖析

3.1 整体流程

NIO 中处理 Accept 事件主要有以下六步

  • selector.select() 阻塞线程,直到事件发生
  • 遍历 selectionKeys
  • 获取一个 key,判断事件类型是否为 Accept

  • 创建 SocketChannel,设置为非阻塞
  • 将 SocketChannel 注册到 selector 中
  • 关注 selectionKeys 的 read 事件

代码如下

// 阻塞直到事件发生
selector.select();

Iterator<SelectionKey> iter = selector.selectionKeys().iterator();
while (iter.hasNext()) {    
    // 拿到一个事件
    SelectionKey key = iter.next();
    
    // 如果是 accept 事件
    if (key.isAcceptable()) {
        
        // 执行 accept,获得 SocketChannel
        SocketChannel channel = serverSocketChannel.accept();
        channel.configureBlocking(false);
        
        // 将 SocketChannel 注册到 selector 中,并关注 read 事件
        channel.register(selector, SelectionKey.OP_READ);
    }
    // ...
}

其中前三步,在 NioEventLoop 剖析中已经分析过了,所以接下来主要分析后三步

3.2 SocketChannel 的创建与注册

发生 Accept 事件后,会执行 NioEventLoop.run 方法的如下 if 分支

//io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
	unsafe.read();
}

NioMessageUnsafe.read 方法

public void read() {

    ...
    
    try {
        try {
            do {
				// doReadMessages 中执行了 accept 获得了 SocketChannel
                // 并创建 NioSocketChannel 作为消息放入 readBuf
                // readBuf 是一个 ArrayList 用来缓存消息
                // private final List<Object> readBuf = new ArrayList<Object>();
                int localRead = doReadMessages(readBuf);
                
                ...
                
				// localRead 值为 1,就一条消息,即接收一个客户端连接
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 触发 read 事件,让 pipeline 上的 handler 处理
            // ServerBootstrapAcceptor.channelRead
            pipeline.fireChannelRead(readBuf.get(i));
        }
        
        ...
        
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

NioSocketChannel.doReadMessages 方法 该方法中处理 accpet 事件,获得 SocketChannel,同时创建了 NioSocketChannel,作为消息放在了 readBuf 中

//io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // 处理 accpet 事件,获得 SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // 创建了 NioSocketChannel,作为消息放在了 readBuf 中
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
       ...
    }

    return 0;
}

在调用 read 方法末尾,会使用 pipeline.fireChannelRead(readBuf.get(i)) 触发 NioServerSocketChannel 上配置的流水线 Handler 的 channelRead 事件,也就是 ServerBootstrapAcceptor.channelRead() (该 Handler 是在 NioServerSocketChannel 初始化的过程中设置的,详见 1.3.2):

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 这时的 msg 是 NioSocketChannel
    final Channel child = (Channel) msg;

    // NioSocketChannel 添加 childHandler,即初始化器
    child.pipeline().addLast(childHandler);

    // 设置选项
    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

通过 AbstractUnsafe.register 方法,将 SocketChannel 注册到了 Selector 中,过程与启动流程中的 Register 过程类似

// io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    
    ...

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 这行代码完成的是 nio boss -> nio worker 线程的切换
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 真正的注册操作
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

AbstractChannel.AbstractUnsafe.register0

//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
    try {
        
        ...
            
        // 该方法将 SocketChannel 注册到 Selector 中
        doRegister();
        
        // 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
        pipeline.invokeHandlerAddedIfNeeded();
        // 执行后就是 head -> 自定义 handler -> tail

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        
        if (isActive()) {
            if (firstRegistration) {
                // 触发 pipeline 上 active 事件
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

AbstractNioChannel.doRegister 将 SocketChannel 注册到 Selector 中

// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 将 Selector 注册到 Selector 中
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            ...
        }
    }
}

HeadContext.channelActive

public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
	// 触发 read(NioSocketChannel 这里 read 只是为了触发 channel 的事件注册,还未涉及数据读取)
    readIfIsAutoRead();
}

AbstractNioChannel.doBeginRead ,通过该方法,SocketChannel 关注了 read 事件

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;
	// 这时候 interestOps 是 0
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 关注 read 事件
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

四. read 事件剖析

read 事件的处理也是在

//io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
	unsafe.read();
}

分支中,通过 unsafe.read() 方法处理的,不过此处调用的方法在 AbstractNioByteChannel.NioByteUnsafe 类中

@Override
public final void read() {
    // 获得 Channel 的配置
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
	// 根据配置创建 ByteBufAllocator(池化非池化、直接非直接内存)
	final ByteBufAllocator allocator = config.getAllocator();
    // 用来分配 byteBuf,确定单次读取大小
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 创建 ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            // 读取内容,放入 ByteBUf 中
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            // 触发 read 事件,让 pipeline 上的 handler 处理
            // 这时是处理 NioSocketChannel 上的 handler
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } 
        // 是否要继续循环
        while (allocHandle.continueReading());

        allocHandle.readComplete();
        // 触发 read complete 事件
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
         // Check if there is a readPending which was not processed yet.
         // This could be for two reasons:
         // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
         // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
         //
         // See   https://github.com/netty/netty/issues/2254  
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.continueReading(io.netty.util.UncheckedBooleanSupplier)

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return 
           // 一般为 true
           config.isAutoRead() &&
           // respectMaybeMoreData 默认为 true
           // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
           (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
           // 小于最大次数,maxMessagePerRead 默认 16
           totalMessages < maxMessagePerRead &&
           // 实际读到了数据
           totalBytesRead > 0;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

漫雪独思

暂无简介

文章
评论
28 人气
更多

推荐作者

櫻之舞

文章 0 评论 0

弥枳

文章 0 评论 0

m2429

文章 0 评论 0

野却迷人

文章 0 评论 0

我怀念的。

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文