Netty 源码分析
一. 服务端启动流程
NIO 原生服务端启动代码:
//1. 创建 Selector,管理多个 Channel
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
// 2. 建立 ServerSocketChannel 与 Selector 的联系。SelectionKey 用于事件发生后,通过它可以知道是哪个 channel 的事件,以及事件类型
SelectionKey sscSelectionKey = serverSocketChannel.register(selector, 0, null);
// 3. 关注 accept 事件
sscSelectionKey.interestOps(SelectionKey.OP_ACCEPT);
Netty 服务端启动代码:
// 1、启动器,负责装配 netty 组件,启动服务器
new ServerBootstrap()
// 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
.group(new NioEventLoopGroup())
// 3、选择服务器的 ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
// 4、绑定端口,并启动服务端
.bind(8000);
Netty 服务端启动流程本质上就是对 NIO 基础 API 的封装。可以将 Netty 启动流程简化成如下代码:
// netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();
// 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
// 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 启动 nio boss 线程执行接下来的操作
//注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
// head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
- 获得选择器 Selector,Netty 中使用 NioEventloopGroup 中的 NioEventloop 封装了线程和选择器
- 创建
NioServerSocketChannel
,该 Channel作为附件添加到ServerSocketChannel
中 - 创建
ServerSocketChannel
,将其设置为非阻塞模式,并注册到 Selector 中,此时未关注事件,但是添加了附件NioServerSocketChannel
- 绑定端口
- 通过
interestOps
设置感兴趣的事件
1.1 bind
选择器 Selector 的创建是在 NioEventloopGroup 中完成的。NioServerSocketChannel 与 ServerSocketChannel 的创建、ServerSocketChannel 注册到 Selector 中以及绑定操作都是由 bind
方法完成的 所以服务器启动的入口便是 io.netty.bootstrap.ServerBootstrap.bind
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
1.2 doBind
真正完成初始化、注册以及绑定的方法是 io.netty.bootstrap.AbstractBootstrap.doBind
dobind 方法在主线程中执行
private ChannelFuture doBind(final SocketAddress localAddress) {
// 负责 NioServerSocketChannel 和 ServerSocketChannel 的创建
// ServerSocketChannel 的注册工作
// init 由 main 线程完成,regisetr 由 NIO 线程完成
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 因为 register 操作是异步的
// 所以要判断主线程执行到这里时,register 操作是否已经执行完毕
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 执行 doBind0 绑定操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
// 如果 register 操作还没执行完,就会到这个分支中来
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 添加监听器,NIO 线程异步进行 doBind0 操作
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
- doBind() 中有两个重要方法
initAndRegister()
和doBind0(regFuture, channel, localAddress, promise)
- initAndRegister主要负责 NioServerSocketChannel 和 ServerSocketChannel 的创建(主线程中完成)与 ServerSocketChannel 注册(NIO 线程中完成)工作
- doBind0则负责连接的创建工作
1.3 initAndRegisterd
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
1.3.1 newChannel
// io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
try {
// 通过反射调用 NioServerSocketChannel 的构造方法
// 创建 NioServerSocketChannel 对象
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
在内部会通过反射调用 NioServerSocketChannel 的构造方法,创建 NioServerSocketChannel 对象:
public NioServerSocketChannel() {
// 创建了 ServerSocketChannel 实例
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
在创建 NioServerSocketChannel 时,会在构造方法里面同步创建 NIO 原生的 ServerSocketChannel
。
1.3.2 init
@Override
void init(Channel channel) {
...
// NioServerSocketChannel 的 Pipeline
ChannelPipeline p = channel.pipeline();
...
// 向 Pipeline 中添加了一个 handler,该 handler 等待被调用
p.addLast(new ChannelInitializer<Channel>() {
@Override
// register 之后才调用该方法
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 创建 handler 并加入到 pipeline 中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加新的 handler,在发生 Accept 事件后建立连接
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
- 该方法向 NioServerSocketChannel 中添加了两个 handler,添加操作在 register 之后被执行
- 一个 handler 负责设置配置
- 一个 handler 负责发生 Accepet 事件后建立连接
1.3.3 register
init 执行完毕后,便执行 ChannelFuture regFuture = config().group().register(channel)
操作 该方法最终调用的是 promise.channel().unsafe().register(this, promise)
方法 promise.channel().unsafe().register(this, promise)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 获取 EventLoop
AbstractChannel.this.eventLoop = eventLoop;
// 此处完成了由 主线程 到 NIO 线程 的切换
// eventLoop.inEventLoop() 用于判断当前线程是否为 NIO 线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 向 NIO 线程中添加任务
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 该方法中会执行 doRegister
// 执行真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
register0 方法
//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
...
// 执行真正的注册操作
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 调用 init 中的 initChannel 方法
pipeline.invokeHandlerAddedIfNeeded();
...
} catch (Throwable t) {
...
}
}
doRegister 方法
// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// javaChannel() 即为 ServerSocketChannel
// eventLoop().unwrappedSelector() 获取 eventLoop 中的 Selector
// this 为 NIOServerSocketChannel,作为附件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
回调 initChannel
//io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) throws Exception {
...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 添加新任务,任务负责添加 handler
// 该 handler 负责发生 Accepet 事件后建立连接
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
Register 主要完成了以下三个操作
- 完成了主线程到 NIO 的线程切换
- 通过
eventLoop.inEventLoop()
进行线程判断,判断当前线程是否为 NIO 线程 - 切换的方式为让 eventLoop 执行 register 的操作
- register 的操作在 NIO 线程中完成
- 通过
- 调用 doRegister 方法
// javaChannel() 即为 ServerSocketChannel // eventLoop().unwrappedSelector() 获取 eventLoop 中的 Selector // this 为 NIOServerSocketChannel,作为附件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- 将 ServerSocketChannel 注册到 EventLoop 的 Selector 中
- 此时还未关注事件
- 添加 NioServerSocketChannel 附件
- 通过
invokeHandlerAddedIfNeeded
调用 init 中的initChannel
方法- initChannel 方法主要创建了 两个 handler
- 一个 handler 负责设置配置
- 一个 handler 负责发生 Accept 事件后建立连接
- initChannel 方法主要创建了 两个 handler
1.4 doBind0
1.4.1 绑定端口
在 doRegister
和 invokeHandlerAddedIfNeeded
操作中的完成后,会调用 safeSetSuccess(promise)
方法,向 Promise 中设置执行成功的结果。此时 doBind
方法中由 initAndRegister
返回的 ChannelFuture 对象 regFuture 便会由 NIO 线程异步执行 doBind0 绑定操作
//io.netty.bootstrap.AbstractBootstrap#doBind
// initAndRegister 为异步方法,会返回 ChannelFuture 对象
final ChannelFuture regFuture = initAndRegister();
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 如果没有异常,则执行绑定操作
doBind0(regFuture, channel, localAddress, promise);
}
}
});
doBind0 最底层调用的是 ServerSocketChannel 的 bind 方法 NioServerSocketChannel.doBind 方法通过该方法,绑定了对应的端口。
// io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用 ServerSocketChannel 的 bind 方法,绑定端口
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
1.4.2 关注事件
在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加 ServerSocketChannel 感兴趣的事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
最终在 AbstractNioChannel.doBeginRead
方法中,会添加 ServerSocketChannel 添加 Accept 事件
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 如果 ServerSocketChannel 没有关注 Accept 事件
if ((interestOps & readInterestOp) == 0) {
// 则让其关注 Accepet 事件
// readInterestOp 取值是 16
// 在 NioServerSocketChannel 创建时初始化
selectionKey.interestOps(interestOps | readInterestOp);
}
}
注意:此处设置 interestOps 时使用的方法,避免覆盖关注的其他事件
- 首先获取 Channel 所有感兴趣的事件
final int interestOps = selectionKey.interestOps();
- 然后再设置其感兴趣的事件
selectionKey.interestOps(interestOps | readInterestOp);
1.5 总结
通过上述步骤,完成了
- NioServerSocketChannel 与 ServerSocketChannel 的创建
- ServerSocketChannel 绑定到 EventLoop 的 Selecot 中,并添加 NioServerSocketChannel 附件
- 绑定了对应的端口
- 关注了 Accept 事件
二. NioEventLoop 剖析
2.1 组成
NioEventLoop 的重要组成部分有三个
- Selector
public final class NioEventLoop extends SingleThreadEventLoop { ... // selector 中的 selectedKeys 是基于数组的 // unwrappedSelector 中的 selectedKeys 是基于 HashSet 的 private Selector selector; private Selector unwrappedSelector; private SelectedSelectionKeySet selectedKeys; ... }
- Thread 与 TaskQueue
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { // 任务队列 private final Queue<Runnable> taskQueue; // 线程 private volatile Thread thread; }
2.1.1 Selector 的创建
Selector 是在 NioEventLoop 的构造方法中被创建的
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
...
// 初始化 selector,初始化过程在 openSelector 中
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 此处等同于 Selector.open() 方法
// 创建了 unwrappedSelector 对象
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
}
NioEventLoop 的构造方法中,调用了 openSelector()
方法, 该方法会返回一个 SelectorTuple对象,该方法是创建 Selector 的核心方法。 openSelector()
方法内部调用了
unwrappedSelector = provider.openSelector();
获得了 Selector 对象 unwrappedSelector
后面会通过反射,修改 unwrappedSelector
中 SelectedKeys 的实现,然后构造一个 SelectedSelectionKeySetSelector
包装类,包装原始的 unwrappedSelector。最后通过 SelectorTuple 的构造方法,将该 Selector 的值赋给 SelectorTuple 类中的 selector 与 unwrappedSelector
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
/**
* 一般调用的是这个构造方法
*/
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
再通过 NioEventLoop 的构造方法,将 SelectorTuple 中的 Selector 赋值给 NioEventLoop 中的 Selector
2.1.2 两个 Selector 的区别
NioEventLoop 中有 selector 和 unwrappedSelector 两个 Selector,它们的区别主要在于 SelectedKeys 的数据结构
- selector 中的 SelectedKeys 是基于数组的
- unwrappedSelector 中的是基于 HashSet的
这样做的主要目的是,数组的遍历效率要高于 HashSet
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
...
// 获得基于数组的 selectedKeySet 实现
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 通过反射拿到 unwrappedSelector 中的 selectedKeys 属性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
...
// 暴力反射,修改私有属性
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
// 替换为基于数组的 selectedKeys 实现
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
selectedKeys = selectedKeySet;
// 调用构造函数,创建 unwrappedSelector 与 selector
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
获得数组实现 SelectedKeys 的 Selector 的原理是反射,主要步骤如下
- 获得基于数组的 selectedKeySet 实现
// 获得基于数组的 selectedKeySet 实现 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); SelectedSelectionKeySet() { keys = new SelectionKey[1024]; }
- 通过反射拿到 unwrappedSelector 中的 SelectedKeySet 并将其替换为 selectedKeySet
- 通过 Selector 的构造方法获得 selector
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet);
- 通过 SelectorTuple 的构造方法获得拥有两种 Selector 的 SelectorTuple 对象,并返回给 NioEventLoop
// 调用构造函数,创建 unwrappedSelector 与 selector return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
2.2 Nio 线程的启动机制
2.2.1 启动
NioEventLoop 中的线程,在首次执行任务时,才会被创建,且只会被创建一次 测试代码
public class TestNioEventLoop {
public static void main(String[] args) {
EventLoop eventLoop = new NioEventLoopGroup().next();
// 使用 NioEventLoop 执行任务
eventLoop.execute(()->{
System.out.println("hello");
});
}
}
进入 execute
执行任务
@Override
public void execute(Runnable task) {
// 检测传入的任务是否为空,为空会抛出 NullPointerException
ObjectUtil.checkNotNull(task, "task");
// 执行任务
// 此处判断了任务是否为懒加载任务,wakesUpForTask 的返回值只会为 true
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
进入上述代码的 execute
方法
private void execute(Runnable task, boolean immediate) {
// 判断当前线程是否为 NIO 线程
// 判断方法为 return thread == this.thread;
// this.thread 即为 NIO 线程,首次执行任务时,其为 null
boolean inEventLoop = inEventLoop();
// 向任务队列 taskQueue 中添加任务
addTask(task);
// 当前线程不是 NIO 线程,则进入 if 语句
if (!inEventLoop) {
// 启动 NIO 线程的核心方法
startThread();
...
}
// 有任务需要被执行时,唤醒阻塞的 NIO 线程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
进入 startThread
方法
private void startThread() {
// 查看 NIO 线程状态是否为未启动
// 该 if 代码块只会执行一次
// state 一开始的值就是 ST_NOT_STARTED
// private volatile int state = ST_NOT_STARTED;
if (state == ST_NOT_STARTED) {
// 通过原子属性更新器将状态更新为启动(ST_STARTED)
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 执行启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
进入 doStartThread
,真正创建 NIO 线程并执行任务
private void doStartThread() {
assert thread == null;
// 创建 NIO 线程并执行任务
executor.execute(new Runnable() {
@Override
public void run() {
// thread 即为 NIO 线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行内部 run 方法
SingleThreadEventExecutor.this.run();
success = true;
}
...
});
}
通过 SingleThreadEventExecutor.this.run()
执行传入的任务(task) 该 run 方法是NioEvnetLoop 的 run 方法
//io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
int selectCnt = 0;
// 死循环,不断地从任务队列中获取各种任务来执行
for (;;) {
// 执行各种任务
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
}
}
}
2.2.2 唤醒
NioEvnetLoop 需要 IO 事件、普通任务以及定时任务,任务在 run 方法的 for 循环中
@Override
protected void run() {
int selectCnt = 0;
// 死循环,不断地从任务队列中获取各种任务来执行
for (;;) {
// 执行各种任务
...
}
}
中被执行,但该循环不会空转,执行到某些代码时,会被阻塞 run 方法中有 SELECT 分支
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 执行 select 方法
strategy = select(curDeadlineNanos);
}
}
...
会执行 NioEvnetLoop 的 select
方法,该方法内部会根据情况,执行 selector 的有参和无参的 select 方法
private int select(long deadlineNanos) throws IOException {
// 如果没有指定阻塞事件,就调用 select()
if (deadlineNanos == NONE) {
return selector.select();
}
// 否则调用 select(timeoutMillis),指定时间内未发生事件就停止阻塞
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
但需要注意的是,select
方法是会阻塞线程的,当没有 IO 事件,但有其他任务需要执行时,需要唤醒线程 唤醒是通过 execute 最后的 if 代码块来完成的
// 有任务需要被执行时,唤醒阻塞的 NIO 线程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
NioEventLoop.wakeup
唤醒被 selector.select 方法阻塞的 NIO 线程
@Override
protected void wakeup(boolean inEventLoop) {
// 只有当其他线程给当前 NIO 线程提交任务时(如执行 execute),才会被唤醒
// 通过 AtomicLong 进行更新,保证每次只能有一个线程唤醒成功
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
// 唤醒被 selector.select 方法阻塞的 NIO 线程
selector.wakeup();
}
}
唤醒时需要进行两个判断
- 判断提交任务的是否为 NIO 线程
- 若是其他线程,才能唤醒 NIO 线程
- 若是 NIO 线程自己,则不能唤醒
- 通过AtomicLong保证有多个线程同时提交任务时,只有一个线程能够唤醒 NIO 线程
2.2.3 SELECT 分支
run 方法的 switch 语句有多条分支,具体执行分支的代码由 strategy 变量控制
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
...
}
strategy 的值由 calculateStrategy
方法确定
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// selectSupplier.get() 底层是 selector.selectNow();
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
该方法会根据 hasTaks 变量判断任务队列中是否有任务
- 若有任务,则通过 selectSupplier 获得 strategy 的值
- get 方法会 selectNow 方法,顺便拿到 IO 事件
private final IntSupplier selectNowSupplier = new IntSupplier() { public int get() throws Exception { return NioEventLoop.this.selectNow(); } }; int selectNow() throws IOException { return this.selector.selectNow(); }
- get 方法会 selectNow 方法,顺便拿到 IO 事件
- 若没有任务,就会进入 SELECT 分支
也就说,当任务队列中没有任务时,才会进入 SELECT 分支,让 NIO 线程阻塞,而不是空转。若有任务,则会通过 get
方法调用 selector.selectNow
方法,顺便拿到 IO 事件
2.3 JDK 原生 NIO 空轮询 BUG
JDK NIO 空轮询 BUG 也就是 JDK NIO 在 Linux 系统下的 epoll 空轮询问题 在 NioEventLoop 中,因为 run 方法中存在一个死循环,需要通过 selector.select 方法来阻塞线程。但是 select 方法因为 BUG,可能无法阻塞线程,导致循环一直执行,使得 CPU 负载打满
@Override
protected void run() {
...
for(;;){
...
// 可能发生空轮询,无法阻塞 NIO 线程
strategy = select(curDeadlineNanos);
...
if(...) {
...
} else if (unexpectedSelectorWakeup(selectCnt) ){
// 通过 unexpectedSelectorWakeup 方法中的 rebuildSelector 重建 selector
// 并将 selectCnt 重置为 0
selectCnt = 0;
}
}
}
Netty 中通过 selectCnt
变量来检测 select
方法是否发生空轮询 BUG 若发生空轮询 BUG,那么 selectCnt 的值会增长是十分迅速。当 selectCnt
的值大于等于 SELECTOR_AUTO_REBUILD_THRESHOLD
(默认 512)时,Netty 则判断其出现了空轮询 BUG,进行如下处理
// io.netty.channel.nio.NioEventLoop#select
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);
// 重建 selector,将原 selector 的配置信息传给新 selector
// 再用新 selector 覆盖旧 selector
rebuildSelector();
return true;
}
通过 rebuildSelector
方法重建 selector,将原 selector 的配置信息传给新 selector,再用新 selector 覆盖旧 selector。同时将 selectCnt 的值设置为 0
2.4 IORatio
NioEventLoop 可以处理 IO 事件和其他任务。不同的操作所耗费的时间是不同的,想要控制 NioEventLoop 处理 IO 事件花费时间占执行所有操作的总时间的比例,需要通过 ioRatio 来控制 NioEventLoop.run 方法
// 处理 IO 事件时间比例,默认为 50%
final int ioRatio = this.ioRatio;
// 如果 IO 事件时间比例设置为 100%
if (ioRatio == 100) {
try {
// 如果需要去处理 IO 事件
if (strategy > 0) {
// 先处理 IO 事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 剩下的时间都去处理普通任务和定时任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) { // 如果需要去处理 IO 事件
// 记录处理 IO 事件前的时间
final long ioStartTime = System.nanoTime();
try {
// 去处理 IO 事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// ioTime 为处理 IO 事件耗费的事件
final long ioTime = System.nanoTime() - ioStartTime;
// 计算出处理其他任务的事件
// 超过设定的时间后,将会停止任务的执行,会在下一次循环中再继续执行
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else { // 没有 IO 事件需要处理
// This will run the minimum number of tasks
// 直接处理普通和定时任务
ranTasks = runAllTasks(0);
}
通过 ioRatio 控制各个任务执行的过程如下
- 判断 ioRatio 是否为 100
- 若是,判断是否需要处理 IO 事件(strategy>0)
- 若需要处理 IO 事件,则先处理 IO 事件
- 若否(或 IO 事件已经处理完毕),接下来去执行所有的普通任务和定时任务,直到所有任务都被处理完
// 没有指定执行任务的时间 ranTasks = runAllTasks();
- 若是,判断是否需要处理 IO 事件(strategy>0)
- 若 ioRatio 不为 100
- 先去处理 IO 事件,记录处理 IO 事件所花费的事件保存在 ioTime 中
- 接下来去处理其他任务,根据 ioTime 与 ioRatio 计算执行其他任务可用的时间
// 比如 ioTime 为 10s,ioRatio 为 50 // 那么通过 10*(100-50)/50=10 计算出其他任务可用的时间为 10s // 处理 IO 事件占用的事件总比例为 50% ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
- 执行其他任务一旦超过可用时间,则会停止执行,在下一次循环中再继续执行
- 若没有 IO 事件需要处理,则去执行最少数量的普通任务和定时任务
// 运行最少数量的任务 ranTasks = runAllTasks(0);
但是此机制无法避免单个任务执行时间过长导致的线程阻塞,因为 runAllTasks
是在每执行一个任务后判断是否超时,如果超时就 break 返回。
2.5 处理事件
IO 事件是通过 NioEventLoop.processSelectedKeys()
方法处理的
private void processSelectedKeys() {
// 如果 selectedKeys 是基于数组的
// 一般情况下都走这个分支
if (selectedKeys != null) {
// 处理各种 IO 事件
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
processSelectedKeysOptimized 方法
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
// 拿到 SelectionKeyec
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
// 获取 SelectionKey 上的附件,即 NioServerSocketChannel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理事件,传入附件 NioServerSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
该方法中通过 fori 的方法,遍历基于数组的 SelectedKey,通过
final SelectionKey k = selectedKeys.keys[i];
获取到 SelectionKey,然后获取其再 Register 时添加的附件 NioServerSocketChannel
// 获取 SelectionKey 上的附件,即 NioServerSocketChannel
final Object a = k.attachment();
如果附件继承自 AbstractNioChannel,则会调用
// 处理事件,传入附件 NioServerSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);
去处理各个事件 真正处理各种事件的方法 processSelectedKey
获取 SelectionKey 的事件,然后进行相应处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
三. Accept 事件剖析
3.1 整体流程
NIO 中处理 Accept 事件主要有以下六步
- selector.select() 阻塞线程,直到事件发生
- 遍历 selectionKeys
- 获取一个 key,判断事件类型是否为 Accept
- 创建 SocketChannel,设置为非阻塞
- 将 SocketChannel 注册到 selector 中
- 关注 selectionKeys 的 read 事件
代码如下
// 阻塞直到事件发生
selector.select();
Iterator<SelectionKey> iter = selector.selectionKeys().iterator();
while (iter.hasNext()) {
// 拿到一个事件
SelectionKey key = iter.next();
// 如果是 accept 事件
if (key.isAcceptable()) {
// 执行 accept,获得 SocketChannel
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
// 将 SocketChannel 注册到 selector 中,并关注 read 事件
channel.register(selector, SelectionKey.OP_READ);
}
// ...
}
其中前三步,在 NioEventLoop 剖析中已经分析过了,所以接下来主要分析后三步
3.2 SocketChannel 的创建与注册
发生 Accept 事件后,会执行 NioEventLoop.run
方法的如下 if 分支
//io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
NioMessageUnsafe.read
方法
public void read() {
...
try {
try {
do {
// doReadMessages 中执行了 accept 获得了 SocketChannel
// 并创建 NioSocketChannel 作为消息放入 readBuf
// readBuf 是一个 ArrayList 用来缓存消息
// private final List<Object> readBuf = new ArrayList<Object>();
int localRead = doReadMessages(readBuf);
...
// localRead 值为 1,就一条消息,即接收一个客户端连接
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理
// ServerBootstrapAcceptor.channelRead
pipeline.fireChannelRead(readBuf.get(i));
}
...
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
NioSocketChannel.doReadMessages
方法 该方法中处理 accpet 事件,获得 SocketChannel,同时创建了 NioSocketChannel,作为消息放在了 readBuf 中
//io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 处理 accpet 事件,获得 SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 创建了 NioSocketChannel,作为消息放在了 readBuf 中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
...
}
return 0;
}
在调用 read 方法末尾,会使用 pipeline.fireChannelRead(readBuf.get(i))
触发 NioServerSocketChannel
上配置的流水线 Handler 的 channelRead 事件,也就是 ServerBootstrapAcceptor.channelRead()
(该 Handler 是在 NioServerSocketChannel 初始化的过程中设置的,详见 1.3.2):
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这时的 msg 是 NioSocketChannel
final Channel child = (Channel) msg;
// NioSocketChannel 添加 childHandler,即初始化器
child.pipeline().addLast(childHandler);
// 设置选项
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
通过 AbstractUnsafe.register
方法,将 SocketChannel 注册到了 Selector 中,过程与启动流程中的 Register 过程类似
// io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这行代码完成的是 nio boss -> nio worker 线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
AbstractChannel.AbstractUnsafe.register0
//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
...
// 该方法将 SocketChannel 注册到 Selector 中
doRegister();
// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
pipeline.invokeHandlerAddedIfNeeded();
// 执行后就是 head -> 自定义 handler -> tail
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 触发 pipeline 上 active 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
AbstractNioChannel.doRegister
将 SocketChannel 注册到 Selector 中
// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将 Selector 注册到 Selector 中
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
HeadContext.channelActive
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发 read(NioSocketChannel 这里 read 只是为了触发 channel 的事件注册,还未涉及数据读取)
readIfIsAutoRead();
}
AbstractNioChannel.doBeginRead
,通过该方法,SocketChannel 关注了 read 事件
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 这时候 interestOps 是 0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 关注 read 事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
四. read 事件剖析
read 事件的处理也是在
//io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
分支中,通过 unsafe.read()
方法处理的,不过此处调用的方法在 AbstractNioByteChannel.NioByteUnsafe 类中
@Override
public final void read() {
// 获得 Channel 的配置
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 根据配置创建 ByteBufAllocator(池化非池化、直接非直接内存)
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 创建 ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 读取内容,放入 ByteBUf 中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理
// 这时是处理 NioSocketChannel 上的 handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
}
// 是否要继续循环
while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发 read complete 事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.continueReading(io.netty.util.UncheckedBooleanSupplier)
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return
// 一般为 true
config.isAutoRead() &&
// respectMaybeMoreData 默认为 true
// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead 默认 16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
totalBytesRead > 0;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

上一篇: Netty 在线聊天室
下一篇: 谈谈自己对于 AOP 的了解
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论