返回介绍

5.11 读完了再通知我:AIO

发布于 2024-08-21 22:20:21 字数 6052 浏览 0 评论 0 收藏 0

AIO是异步IO的缩写,即Asynchronized。虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身还是同步的。

但对于AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此,AIO是完全不会阻塞的。此时,我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。

下面,我将通过AIO来实现一个简单的EchoServer以及对应的客户端。

5.11.1 AIO EchoServer的实现

异步IO需要使用异步通道(AsynchronousServerSocketChannel):

public final static int PORT = 8000;
private AsynchronousServerSocketChannel server;
public AIOEchoServer() throws IOException {
  server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));
}

上述代码绑定了8000端口为服务器端口,并使用AsynchronousServerSocketChannel异步Channel作为服务器,变量名为server。

我们使用这个server来进行客户端的接收和处理:

01 public void start() throws InterruptedException, ExecutionException, TimeoutException {
02   System.out.println("Server listen on " + PORT);
03   //注册事件和事件完成后的处理器
04   server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
05     final ByteBuffer buffer = ByteBuffer.allocate(1024);
06     public void completed(AsynchronousSocketChannel result, Object attachment) {
07       System.out.println(Thread.currentThread().getName());
08       Future<Integer> writeResult=null;
09       try {
10         buffer.clear();
11         result.read(buffer).get(100, TimeUnit.SECONDS);
12         buffer.flip();
13         writeResult=result.write(buffer);
14       } catch (InterruptedException | ExecutionException e) {
15         e.printStackTrace();
16       } catch (TimeoutException e) {
17         e.printStackTrace();
18       } finally {
19         try {
20           server.accept(null, this);
21           writeResult.get();
22           result.close();
23         } catch (Exception e) {
24           System.out.println(e.toString());
25         }
26       }
27     }
28
29     @Override
30     public void failed(Throwable exc, Object attachment) {
31       System.out.println("failed: " + exc);
32     }
33   });
34 }

上述定义的start()方法开启了服务器。值得注意的是,这个方法除了第2行的打印语句外,只调用了一个函数server.accept()。之后,你看到的那一大堆代码只是这个函数的参数。

AsynchronousServerSocketChannel.accept()方法会立即返回。它并不会真的去等待客户端的到来。在这里使用的accept()方法的签名为:

public final <A> void accept(A attachment,
           CompletionHandler<AsynchronousSocketChannel,? super A> handler)

它的第一个参数是一个附件,可以是任意类型,作用是让当前线程和后续的回调方法可以共享信息,它会在后续调用中,传递给handler。它的第二个参数是CompletionHandler接口。这个接口有两个方法:

void completed(V result, A attachment);
void failed(Throwable exc, A attachment);

这两个方法分别在异步操作accept()成功或者失败时被回调。

因此AsynchronousServerSocketChannel.accept()实际上做了两件事,第一是发起accept请求,告诉系统可以开始监听端口了。第二,注册CompletionHandler实例,告诉系统,一旦有客户端前来连接,如果成功连接,就去执行CompletionHandler.completed()方法;如果连接失败,就去执行CompletionHandler.failed()方法。

所以,server.accept()方法不会阻塞,它会立即返回。

下面,来分析一下CompletionHandler.completed()的实现。当completed()被执行时,意味着已经有客户端成功连接了。在第11行,使用read()方法读取客户的数据。这里要注意,AsynchronousSocketChannel.read()方法也是异步的,换句话说它不会等待读取完成了再返回,而是立即返回,返回的结果是一个Future,因此这里就是Future模式的典型应用。为了编程方便,我在这里直接调用Future.get()方法,进行等待,将这个异步方法变成了同步方法。因此,在第11行执行完成后,数据读取就已经完成了。

之后,将数据回写给客户端(第13行)。这里调用的是AsynchronousSocketChannel.write()方法。这个方法不会等待数据全部写完,也是立即返回的。同样,它返回的也是Future对象。

再之后,在第20行,服务器进行下一个客户端连接的准备。同时关闭当前正在处理的客户端连接。但在关闭之前,得先确保之前的write()操作已经完成,因此,使用Future.get()方法进行等待(第21行)。

接下来,我们只需要在主函数中调用这个start()方法就可以开启服务器了:

01 public static void main(String args[]) throws Exception {
02   new AIOEchoServer().start();
03   // 主线程可以继续自己的行为
04   while (true) {
05     Thread.sleep(1000);
06   }
07 }

上述代码第2行,调用start()方法开启服务器。但由于start()方法里使用的都是异步方法,因此它会马上返回,它并不像阻塞方法那样会进行等待。因此,如果想让程序驻守执行,第4~6行的等待语句是必需的。否则,在start()方法结束后,不等客户端到来,程序已经运行完成,主线程就将退出。

5.11.2 AIO Echo客户端实现

在服务端的实现中,我们使用Future.get()方法将异步调用转为了一个同步等待。在客户端的实现里,我们将全部使用异步回调实现:

01 public class AIOClient {
02   public static void main(String[] args) throws Exception {
03     final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
04   client.connect(new InetSocketAddress("localhost", 8000), null, new CompletionHandler<Void, Object>() {
05       @Override
06       public void completed(Void result, Object attachment) {
07  client.write(ByteBuffer.wrap("Hello!".getBytes()), null, new CompletionHandler<Integer, Object>() {
08           @Override
09           public void completed(Integer result, Object attachment) {
10             try {
11               ByteBuffer buffer = ByteBuffer.allocate(1024);
12           client.read(buffer,buffer,new CompletionHandler<Integer, ByteBuffer>(){
13                 @Override
14                 public void completed(Integer result, ByteBuffer buffer) {
15                   buffer.flip();
16                   System.out.println(new String(buffer.array()));
17                   try {
18                     client.close();
19                   } catch (IOException e) {
20                     e.printStackTrace();
21                   }
22                 }
23                 @Override
24                 public void failed(Throwable exc, ByteBuffer attachment) {
25                 }
26               });
27             } catch (Exception e) {
28               e.printStackTrace();
29             }
30           }
31           @Override
32           public void failed(Throwable exc, Object attachment) {
33           }
34         });
35       }
36       @Override
37       public void failed(Throwable exc, Object attachment) {
38       }
39     });
40     //由于主线程马上结束,这里等待上述处理全部完成
41     Thread.sleep(1000);
42   }
43 }

上面的AIO客户端看起来代码很长,但实际上只有三个语句。

第一个语句为第3行,打开AsynchronousSocketChannel通道。第二个语句是第4~39行,它让客户端去连接指定的服务器,并注册了一系列事件。第三个语句是第41行,让线程进行等待。虽然第2个语句看起来很长,但是它完全是异步的,因此会很快返回,并不会等待在连接操作的过程中。如果不进行等待,客户端会马上退出,也就无法继续工作了。

第4行,客户端进行网络连接,并注册了连接成功的回调函数CompletionHandler<Void, Object>。待连接成功后,就会进入代码第7行。第7行进行数据写入,向服务端发送数据。这个过程也是异步的,会很快返回。写入完成后,会通知回调接口CompletionHandler<Integer, Object>,进入第10行。第10行开始,准备进行数据读取,从服务端读取回写的数据。当然,第12行的read()函数也是立即返回的,成功读取所有数据后,会回调CompletionHandler<Integer, ByteBuffer>接口,进入第15行。在第15~16行,打印接收到的数据。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文