同步与异步,阻塞与非阻塞

发布于 2022-09-12 12:55:28 字数 51 浏览 46 评论 0

“同步与异步,阻塞与非阻塞”这个概念,在讨论io和并发两个场景中有什么不同?该怎么理解?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

野生奥特曼 2022-09-19 12:55:28

今天早上关注了这个问题,刚抽出时间大概整理下,以下仅是个人理解:

一定要多看几遍代码并结合文字理解下


0、从I/O说起
这些概念之所以容易令人迷惑,在于很多人对I/O就没有清晰准确的理解,后面的理解自然不可能正确。
我想用一个具体的例子来说明一下I/O
设想自己是一个进程,就叫小进吧。小进需要接收一个输入,我们不管这个输入是从网络套接字来,还是键盘,鼠标来,输入的来源可以千千万万。但是,都必须由内核来帮小进完成,为啥内核这么霸道?因为计算机上运行的可不只是咱小进一个进程,还有很多进程。这些进程兄弟也可能需要从这些输入设备接收输入,没有内核居中协调,岂不是乱套。
从小进的角度看,内核帮助它完成输入,其实包括三个步骤:

  • 内核替小进接收好数据,这些数据暂时存在内核的内存空间
  • 内核将数据从自己的内存空间复制到小进的内存空间
  • 告诉小进,输入数据来了,赶快读吧

这三步看似挺简单,其实在具体实现时,有很多地方需要考虑:

  • 小进如何告诉内核自己要接收一个输入?
  • 内核接到小进的请求,替小进接收好数据这段时间, 小进咋办?
  • 内核在将数据复制到小进的内存空间这段时间,小进咋办?
  • 到底什么时候告诉小进数据准备好了,是在内核接收好数据之后就告诉小进,还是在将数据复制到小进的内存空间之后再告诉他?
  • 内核以什么样的方式告诉小进,数据准备好了?

1、阻塞式I/O模型
对上面5个问题,最简单的解决方案就是阻塞式I/O模型,它的过程是这样的:
小进:内核内核,我要接收一个键盘输入,快点帮我完成!
内核:好咧!biubiu!一个阻塞丢给小进,小进顿时石化,就像被孙悟空点了定一样。
就这样,小进在石化中,时间一点点流逝。终于,内核收到了数据。
内核:数据终于来了,我要开干了!duang duang duang,先把数据存在自己的内核空间,然后又复制到小进的用户空间。
内核:biubiu!一个解除阻塞丢给小进,小进瞬间复活,小进的记忆还是停留在让内核帮他接收输入时。
小进:哇!内核真靠谱,数据已经有了!干活去!
我们可以看到,小进发出接收输入的请求给内核开始,就处于阻塞状态,直到内核将数据复制到小进的用户空间,小进才解除阻塞。
2、非阻塞式I/O
小进发现,阻塞式I/O中,自己总要被阻塞好久,好不爽啊,于是小进改用了非阻塞式I/O,其过程是这样的:
小进:内核内核,我要接收一个输入,赶紧帮我看看,数据到了没有,先说好,不要阻塞我。
内核:查看了一下自己的内核空间,没有发现数据,于是迅速告诉小进,没有呢!并继续帮小进等着数据。
如此这样,小进不断地问内核,终于,过了一段时间,小进再一次询问时,内核往自己的空间中一查,呦!数据来了,不胜其烦的内核迅速告诉小进,数据好了!
小进:快给我!
内核:biu!一个阻塞丢给小进,悲催的小进还是石化了!
内核赶紧将自己空间的输入数据复制到小进的用户空间,复制好后。
内核:biu!一个非阻塞丢给小进,小进立马复活
小进:哇!数据来了,啥也不说,干活!
我们看到,所谓的非阻塞I/O,其实在内核将数据从内核空间复制到小进的用户空间时,小进还是被阻塞的。
3、信号驱动式I/O
非阻塞I/O中,小进不停地问内核,数据好了没有啊,内核感觉太烦了,于是想出一个好办法。
内核告诉小进,本内核升级了,如果想要我替你接收输入,请先注册一个信号处理函数,等数据准备好时,我会发信号给你。于是,现在的流程是这样的:
小进:注册信号处理函数,告诉内核,自己要接收一个输入,然后继续干活!
内核:收到函数,开始执行数据接收
接收完成时,给小进发送信号,信号处理函数收到信号,开始向内核发送读数据请求
内核:biu!阻塞了小进,并把数据从内核空间复制到小进的用户空间。
内核:biu!解除了阻塞
小进:哇!数据来了!啥也不说,干活去!
4、异步I/O
上面的三种I/O解决方案中,小进都被阻塞了,只不过是阻塞时间长短不一样,第一种方案中小进被阻塞的时间长一些,在内核接收数据以及将数据复制到小进的用户空间时,都被阻塞。
第二、第三种方案中,只在内核将数据从内核空间复制到小进的用户空间时,小进才被阻塞。
我们现在说的异步I/O,目的就是让小进绝对不被阻塞。其过程是这样的:
小进:内核内核,我要接收一个输入,弄好了告诉我。同时将一个信号和信号处理函数告诉内核,然后继续干自己的活了。
内核:得了您嘞,您先忙。
一直到内核接收到数据并将数据从内核空间复制到小进的用户空间后,内核才给小进发送信号。小进在信号处理函数中可以直接处理数据。

引自

1、阻塞式I/O式

客户端代码


public class Client {

    public static void main(String[] args) {
        Socket socket = null;
        try {
            System.out.println("socket begin " + System.currentTimeMillis());
            // 随机绑定本地地址与端口
            socket = new Socket("localhost", 8888);
            System.out.println("socket   end " + System.currentTimeMillis());
            OutputStream os = socket.getOutputStream();
            Random ran = new Random();
            for (int n = 0; n < 10; n++) {
                System.out.println("send message " + n);
                os.write(("hello server form " + socket.getLocalAddress().getHostAddress() + " - " + n).getBytes());
                try {
                    TimeUnit.SECONDS.sleep(ran.nextInt(10));
                } catch (InterruptedException e) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (socket != null) {
                    // 自动关闭绑定流
                    socket.close();
                }
                System.out.println("exit");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

服务端代码


public class Server {

    public static void main(String[] args) {
        ServerSocket serverSocket = null;
        Socket socket = null;
        ThreadPoolExecutor executor = null;
        try {
            // 初始化线程池
            executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                    Runtime.getRuntime().availableProcessors() * 2,
                    0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new ThreadBuilder());

            // 监听通配符地址
            serverSocket = new ServerSocket(8888);
            System.out.println("accept begin " + System.currentTimeMillis());
            while ((socket = serverSocket.accept()) != null) {
                executor.execute(new Task(socket));
            }
            System.out.println("accept   end " + System.currentTimeMillis());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
                System.out.println("exit");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadBuilder implements ThreadFactory {
        private AtomicInteger counter = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
            thread.setUncaughtExceptionHandler((t, e) -> {
                if (e instanceof TaskException) {
                    System.err.println(t.getName() + "|" + e.getCause().getMessage());
                } else {
                    System.err.println(t.getName() + "|" + e.getMessage());
                }
            });
            return thread;
        }
    }

    static class Task implements Runnable {

        private byte[] buffer = new byte[10 * 1024];
        private Socket socket;

        public Task(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                InputStream is = socket.getInputStream();
                System.out.println("--------------------------------------------------");
                System.out.println("read begin " + System.currentTimeMillis());
                System.out.println("***");
                int len = is.read(buffer);// 呈阻塞效果
                while (len != -1) {
                    String str = new String(buffer, 0, len);
                    System.out.println(str);
                    len = is.read(buffer);
                }
                System.out.println("***");
                System.out.println("read   end " + System.currentTimeMillis());
                System.out.println("--------------------------------------------------");
            } catch (IOException e) {
                throw new TaskException(e);
            } finally {
                if (socket != null) {
                    try {
                        // 自动关闭绑定流
                        socket.close();
                        System.out.println("socket closed");
                    } catch (IOException e) {
                        System.err.println("socket close failed");
                    }
                }
            }
        }
    }

    static class TaskException extends RuntimeException {
        public TaskException(Throwable cause) {
            super(cause);
        }
    }
}

2、非阻塞式I/O

客户端代码同上


服务端代码


public class Server {

    public static void main(String[] args) {
        ServerSocketChannel serverSocket = null;
        SocketChannel socket = null;
        ThreadPoolExecutor executor = null;
        try {
            // 初始化线程池
            executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                    Runtime.getRuntime().availableProcessors() * 2,
                    0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new ThreadBuilder());

            serverSocket = ServerSocketChannel.open();
            // 设置阻塞
            serverSocket.configureBlocking(true);
            // 监听通配符地址
            serverSocket.bind(new InetSocketAddress(8888));
            System.out.println("accept begin " + System.currentTimeMillis());
            while ((socket = serverSocket.accept()) != null) {
                // 设置非阻塞
                socket.configureBlocking(false);
                executor.execute(new Task(socket));
            }
            System.out.println("accept   end " + System.currentTimeMillis());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
                System.out.println("exit");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadBuilder implements ThreadFactory {
        private AtomicInteger counter = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
            thread.setUncaughtExceptionHandler((t, e) -> {
                if (e instanceof TaskException) {
                    System.err.println(t.getName() + "|" + e.getCause().getMessage());
                } else {
                    System.err.println(t.getName() + "|" + e.getMessage());
                }
            });
            return thread;
        }
    }

    static class Task implements Runnable {

        private ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
        private SocketChannel socket;

        public Task(SocketChannel socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                System.out.println("--------------------------------------------------");
                System.out.println("read begin " + System.currentTimeMillis());
                System.out.println("***");
                socket.read(buffer);// 呈阻塞效果
                while (true) {
                    if (buffer.position() == 0) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(200);
                        } catch (InterruptedException e) {
                            continue;
                        }
                    } else {
                        buffer.flip();
                        String str = new String(buffer.array(), 0, buffer.limit());
                        System.out.println(str);
                        if ("exit".equals(str)) {
                            break;
                        }
                        buffer.clear();
                    }
                    socket.read(buffer);
                }
                System.out.println("***");
                System.out.println("read   end " + System.currentTimeMillis());
                System.out.println("--------------------------------------------------");
            } catch (IOException e) {
                throw new TaskException(e);
            } finally {
                if (socket != null) {
                    try {
                        // 自动关闭绑定流
                        socket.close();
                        System.out.println("socket closed");
                    } catch (IOException e) {
                        System.err.println("socket close failed");
                    }
                }
            }
        }
    }

    static class TaskException extends RuntimeException {
        public TaskException(Throwable cause) {
            super(cause);
        }
    }
}

3、多路复用式I/O(基于非阻塞式I/O)

客户端代码同上


服务端代码


public class Server {

    public static void main(String[] args) {
        Selector selector = null;
        ServerSocketChannel serverSocket = null;
        SocketChannel socket = null;
        ThreadPoolExecutor executor = null;
        try {
            // 初始化线程池
            executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                    Runtime.getRuntime().availableProcessors() * 2,
                    0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new ThreadBuilder());

            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            // 设置非阻塞
            serverSocket.configureBlocking(false);
            // 监听通配符地址
            serverSocket.bind(new InetSocketAddress(8888));
            serverSocket.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("accept begin " + System.currentTimeMillis());
            while (true) {
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 设置非阻塞
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        executor.execute(new Task(socketChannel));
                        key.cancel();
                    } else {
                        // TODO 写事件注册
                    }
                    iterator.remove();
                }
            }
            // System.out.println("accept   end " + System.currentTimeMillis());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
                System.out.println("exit");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadBuilder implements ThreadFactory {
        private AtomicInteger counter = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
            thread.setUncaughtExceptionHandler((t, e) -> {
                if (e instanceof TaskException) {
                    System.err.println(t.getName() + "|" + e.getCause().getMessage());
                } else {
                    System.err.println(t.getName() + "|" + e.getMessage());
                }
            });
            return thread;
        }
    }

    static class Task implements Runnable {

        private ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
        private SocketChannel socket;

        public Task(SocketChannel socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                System.out.println("--------------------------------------------------");
                System.out.println("read begin " + System.currentTimeMillis());
                System.out.println("***");
                socket.read(buffer);// 呈阻塞效果
                while (true) {
                    if (buffer.position() == 0) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(200);
                        } catch (InterruptedException e) {
                            continue;
                        }
                    } else {
                        buffer.flip();
                        String str = new String(buffer.array(), 0, buffer.limit());
                        System.out.println(str);
                        if ("exit".equals(str)) {
                            break;
                        }
                        buffer.clear();
                    }
                    socket.read(buffer);
                }
                System.out.println("***");
                System.out.println("read   end " + System.currentTimeMillis());
                System.out.println("--------------------------------------------------");
            } catch (IOException e) {
                throw new TaskException(e);
            } finally {
                if (socket != null) {
                    try {
                        // 自动关闭绑定流
                        socket.close();
                        System.out.println("socket closed");
                    } catch (IOException e) {
                        System.err.println("socket close failed");
                    }
                }
            }
        }
    }

    static class TaskException extends RuntimeException {
        public TaskException(Throwable cause) {
            super(cause);
        }
    }
}

4、信号驱动式I/O

JAVA没有实现

5、异步I/O

客户端代码同上


服务端代码


public class Server {

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8888));
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            public void completed(AsynchronousSocketChannel asc, Void att) {
                serverSocketChannel.accept(null, this);
                ByteBuffer byteBuffer = ByteBuffer.allocate(10 * 1024);
                asc.read(byteBuffer, null, new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer result, Void attachment) {
                        byteBuffer.flip();
                        System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit()));
                        byteBuffer.clear();
                        try {
                            asc.close();
                        } catch (IOException e) {
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                    }
                });
            }

            public void failed(Throwable exc, Void att) {
            }
        });
        for (; ; ) {

        }
    }
}

牛奶工送牛奶场景

  1. 阻塞式:每天早上自己去小区门口等牛奶工
  2. 非阻塞式:每天早上在家从窗户早上隔3分钟看看牛奶工到了没,到了的话去拿
  3. 多路复用式:每天早上由小区门卫室接待所有牛奶工,到了会给住户发短信,你马上去拿
  4. 信号驱动式:每天早上牛奶工到了会给你发短信,你马上去拿
  5. 异步式:每天早上牛奶工直接放到小区住户牛奶柜并发短信,不需要现在去拿

程序分为CPU计算型和I/O读写型,线程尤其是被内核调度的线程是及其珍贵的资源(JAVA计划在JDK将来的版本实现由JVM”自己“调度的轻型线程),在有限的线程资源下CPU计算型程序不但不会有明显提升,反而由于频繁的上下文切换导致性能下降(这也是Redis这种基于内存的数据库采用单工作线程并且速度非常快的原因,另一个重要的原因是单线程导致了不用为共享资源给线程加/解锁造成人为阻塞),而在I/O读写型的程序中,多线程工作在以上五种模式下性能是逐步提升的(最后多说一句,还是以Redis举例,不管是Jedis-Pool这种池化客户端还是Lettuce这种单连接客户端,当多用户接入Redis服务器时一定是多连接的,这时候就要用到多路复用来处理用户请求了,至于为什么没有用异步,一个原因是工作线程是单线程,另一个原因是异步I/O模型在性能提升方面有限并且复杂度高,以至于Netty在新版本的包中把这种模式删除了)

_失温 2022-09-19 12:55:28

同步IO,由用户程序主动去调用系统调用获知读写是否已经就绪。

异步IO,内核当读写就绪时,会回调通知用户程序。

阻塞IO,用户程序调用系统调用,如果没有就绪,内核不会返回,用户程序阻塞等待。

非阻塞,用户程序调用系统调用,如果没有就绪,内核立刻返回,用户程序不会阻塞等待。

一般来讲,按语义来说就分为同步阻塞,同步非阻塞,异步(非阻塞)三种。

檐上三寸雪 2022-09-19 12:55:28

阻塞IO:请求进程一直等待IO准备就绪。
非阻塞IO:请求进程不会等待IO准备就绪。
同步IO操作:导致请求进程阻塞,直到IO操作完成。
异步IO操作:不导致请求进程阻塞。

举个小例子来理解阻塞,非阻塞,同步和异步的关系,我们知道编写一个程序可以有多个函数,每一个函数的执行都是相互独立的,但是 对于一个程序的执行过程,每一个函数都是必须的,那么如果我们需要等待一个函数的执行结束然后返回一个结果(比如接口调用),那么我们说该函数的调用是阻塞的,对于至少有一个函数调用阻塞的程序,在执行的过程中,必定存在阻塞的一个过程,那么我们就说该程序的执行是同步的,对于异步自然就是所有的函数执行过程都是非阻塞的。

这里的程序就是一次完整的IO,一个函数为IO在执行过程中的一个独立的小片段。

如果没有你 2022-09-19 12:55:28

同步与异步

针对调用API 之后结果怎么给到调用者

  • 同步:API 的返回值
  • 异步:API 通过回调函数、事件等方式通知调用者

阻塞与非阻塞

针对调用 API 之后,调用线程的行为

  • 阻塞: 等待 API返回后才继续执行
  • 非阻塞: API 调用后继续执行(不等待API返回)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文