UNIX 系统下的 I/O 模型有 5 种:同步阻塞 I/O、同步非阻塞 I/O、I/O 多路复用、信号驱动 I/O 和异步 I/O。

所谓的I/O 就是计算机内存与外部设备之间拷贝数据的过程

Java I/O 模型

对于一个网络 I/O 通信过程,比如网络数据读取,会涉及两个对象,一个是调用这个 I/O 操作的用户线程,另外一个就是操作系统内核。一个进程的地址空间分为用户空间和内核空间,用户线程不能直接访问内核空间。

当用户线程发起 I/O 操作后,网络数据读取操作会经历两个步骤:

  • 用户线程等待内核将数据从网卡拷贝到内核空间。
  • 内核将数据从内核空间拷贝到用户空间。

各种 I/O 模型的区别就是:它们实现这两个步骤的方式是不一样的。

同步阻塞 I/O:用户线程发起 read 调用后就阻塞了,让出 CPU。内核等待网卡数据到来,把数据从网卡拷贝到内核空间,接着把数据拷贝到用户空间,再把用户线程叫醒。

image-20220112150728564

同步非阻塞 I/O:用户线程不断的发起 read 调用,数据没到内核空间时,每次都返回失败,直到数据到了内核空间,这一次 read 调用后,在等待数据从内核空间拷贝到用户空间这段时间里,线程还是阻塞的,等数据到了用户空间再把线程叫醒。

image-20220112150807336

I/O 多路复用:用户线程的读取操作分成两步了,线程先发起 select 调用,目的是问内核数据准备好了吗?等内核把数据准备好了,用户线程再发起 read 调用。在等待数据从内核空间拷贝到用户空间这段时间里,线程还是阻塞的。那为什么叫 I/O 多路复用呢?因为一次 select 调用可以向内核查多个数据通道(Channel)的状态,所以叫多路复用。

image-20220112150900210

异步 I/O:用户线程发起 read 调用的同时注册一个回调函数,read 立即返回,等内核将数据准备好后,再调用指定的回调函数完成处理。在这个过程中,用户线程一直没有阻塞。

image-20220112150917214

总体工作流程

我们知道,对于 Java 的多路复用器的使用,无非是两步:

  1. 创建一个 Seletor,在它身上注册各种感兴趣的事件,然后调用 select 方法,等待感兴趣的事情发生。

  2. 感兴趣的事情发生了,比如可以读了,这时便创建一个新的线程从 Channel 中读数据。

tomcat 的 NioEndpoint 组件虽然实现比较复杂,但基本原理就是上面两步。我们先来看看它有哪些组件,它一共包含 LimitLatch、Acceptor、Poller、SocketProcessor 和 Executor 共 5 个组件,它们的工作过程如下图所示。

image-20220112151229014

LimitLatch 是连接控制器,它负责控制最大连接数,NIO 模式下默认是 10000,达到这个阈值后,连接请求被拒绝。

Acceptor 跑在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。

Poller 的本质是一个 Selector,也跑在单独线程里。Poller 在内部维护一个 Channel 数组,它在一个死循环里不断检测 Channel 的数据就绪状态,一旦有 Channel 可读,就生成一个 SocketProcessor 任务对象扔给 Executor 去处理。

Executor 就是线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据。我们知道,Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。

LimitLatch

LimitLatch 用来控制连接个数,当连接数到达最大时阻塞线程,直到后续组件处理完一个连接后将连接数减 1。请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。LimitLatch 的核心代码如下:

public class LimitLatch {
    private class Sync extends AbstractQueuedSynchronizer {
     
        @Override
        protected int tryAcquireShared() {
            long newCount = count.incrementAndGet();
            if (newCount > limit) {
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }
 
        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }
 
    private final Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    
    // 线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
    public void countUpOrAwait() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }
 
    // 调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
    public long countDown() {
      sync.releaseShared(0);
      long result = getCount();
      return result;
   }
}

从上面的代码我们看到,LimitLatch 内步定义了内部类 Sync,而 Sync 扩展了 AQS,AQS 是 Java 并发包中的一个核心类,它在内部维护一个状态和一个线程队列,可以用来控制线程什么时候挂起,什么时候唤醒。我们可以扩展它来实现自己的同步器,实际上 Java 并发包里的锁和条件变量等等都是通过 AQS 来实现的,而这里的 LimitLatch 也不例外。

理解上面的代码时有两个要点:

  1. 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。那 AQS 怎么知道是阻塞还是不阻塞用户线程呢?其实这是由 AQS 的使用者来决定的,也就是内部类 Sync 来决定的,因为 Sync 类重写了 AQS 的tryAcquireShared() 方法。它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1。

  2. 如何用户线程被阻塞到了 AQS 的队列,那什么时候唤醒呢?同样是由 Sync 内部类决定,Sync 重写了 AQS 的releaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒。

其实你会发现 AQS 就是一个骨架抽象类,它帮我们搭了个架子,用来控制线程的阻塞和唤醒。具体什么时候阻塞、什么时候唤醒由你来决定。我们还注意到,当前线程数被定义成原子变量 AtomicLong,而 limit 变量用 volatile 关键字来修饰,这些并发编程的实际运用。

Acceptor

Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。一个端口号只能对应一个 ServerSocketChannel,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定。初始化过程如下:

serverSock = ServerSocketChannel.open();
serverSock.socket().bind(addr,getAcceptCount());
serverSock.configureBlocking(true);

从上面的初始化代码我们可以看到两个关键信息:

1.bind 方法的第二个参数表示操作系统的等待队列长度,我在上面提到,当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过 acceptCount 参数配置,默认是 100。

2.ServerSocketChannel 被设置成阻塞模式,也就是说它是以阻塞的方式接收连接的。

ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的生产者 - 消费者模式,Acceptor 与 Poller 线程之间通过 Queue 通信。

Poller

Poller 本质是一个 Selector,它内部维护一个 Queue,这个 Queue 定义如下:

private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

SynchronizedQueue 的方法比如 offer、poll、size 和 clear 方法,都使用了 Synchronized 关键字进行修饰,用来保证同一时刻只有一个 Acceptor 线程对 Queue 进行读写。同时有多个 Poller 线程在运行,每个 Poller 线程都有自己的 Queue。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。

Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel。

SocketProcessor

我们知道,Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象,这里请你注意:

Http11Processor 并不是直接读取 Channel 的。这是因为 tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel,为了对 Http11Processor 屏蔽这些差异,tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。

Executor

Executor 是 tomcat 定制版的线程池,它负责创建真正干活的工作线程,干什么活呢?就是执行 SocketProcessor 的 run 方法,也就是解析请求并通过容器来处理请求,最终会调用到我们的 Servlet。

Java NIO.2 回顾

public class Nio2Server {
 
   void listen(){
      //1. 创建一个线程池
      ExecutorService es = Executors.newCachedThreadPool();
 
      //2. 创建异步通道群组
      AsynchronousChannelGroup tg = AsynchronousChannelGroup.withCachedThreadPool(es, 1);
      
      //3. 创建服务端异步通道
      AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(tg);
 
      //4. 绑定监听端口
      assc.bind(new InetSocketAddress(8080));
 
      //5. 监听连接,传入回调类处理连接请求
      assc.accept(this, new AcceptHandler()); 
   }
}

上面的代码主要做了 5 件事情:

  1. 创建一个线程池,这个线程池用来执行来自内核的回调请求。
  2. 创建一个 AsynchronousChannelGroup,并绑定一个线程池。
  3. 创建 AsynchronousServerSocketChannel,并绑定到 AsynchronousChannelGroup。
  4. 绑定一个监听端口。
  5. 调用 accept 方法开始监听连接请求,同时传入一个回调类去处理连接请求。请你注意,accept 方法的第一个参数是 this 对象,就是 Nio2Server 对象本身。

你可能会问,为什么需要创建一个线程池呢?其实在异步 I/O 模型里,应用程序不知道数据在什么时候到达,因此向内核注册回调函数,当数据到达时,内核就会调用这个回调函数。同时为了提高处理速度,会提供一个线程池给内核使用,这样不会耽误内核线程的工作,内核只需要把工作交给线程池就立即返回了。

我们再来看看处理连接的回调类 AcceptHandler 是什么样的。

//AcceptHandler 类实现了 CompletionHandler 接口的 completed 方法。它还有两个模板参数,第一个是异步通道,第二个就是 Nio2Server 本身
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Nio2Server> {
 
   // 具体处理连接请求的就是 completed 方法,它有两个参数:第一个是异步通道,第二个就是上面传入的 NioServer 对象
   @Override
   public void completed(AsynchronousSocketChannel asc, Nio2Server attachment) {      
      // 调用 accept 方法继续接收其他客户端的请求
      attachment.assc.accept(attachment, this);
      
      // 1. 先分配好 Buffer,告诉内核,数据拷贝到哪里去
      ByteBuffer buf = ByteBuffer.allocate(1024);
      
      // 2. 调用 read 函数读取数据,除了把 buf 作为参数传入,还传入读回调类
      channel.read(buf, buf, new ReadHandler(asc)); 
 
}

我们看到它实现了 CompletionHandler 接口,下面我们先来看看 CompletionHandler 接口的定义。

public interface CompletionHandler<V,A> {
 
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

CompletionHandler 接口有两个模板参数 V 和 A,分别表示 I/O 调用的返回值和附件类。比如 accept 的返回值就是 AsynchronousSocketChannel,而附件类由用户自己决定,在 accept 的调用中,我们传入了一个 Nio2Server。因此 AcceptHandler 带有了两个模板参数:AsynchronousSocketChannel 和 Nio2Server。

CompletionHandler 有两个方法:completed 和 failed,分别在 I/O 操作成功和失败时调用。completed 方法有两个参数,其实就是前面说的两个模板参数。也就是说,Java 的 NIO.2 在调用回调方法时,会把返回值和附件类当作参数传给 NIO.2 的使用者。

下面我们再来看看处理读的回调类 ReadHandler 长什么样子。

public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {   
    // 读取到消息后的处理  
    @Override  
    public void completed(Integer result, ByteBuffer attachment) {  
        //attachment 就是数据,调用 flip 操作,其实就是把读的位置移动最前面
        attachment.flip();  
        // 读取数据
        ... 
    }  
 
    void failed(Throwable exc, A attachment){
        ...
    }
}

read 调用的返回值是一个整型数,所以我们回调方法里的第一个参数就是一个整型,表示有多少数据被读取到了 Buffer 中。第二个参数是一个 ByteBuffer,这是因为我们在调用 read 方法时,把用来存放数据的 ByteBuffer 当作附件类传进去了,所以在回调方法里,有 ByteBuffer 类型的参数,我们直接从这个 ByteBuffer 里获取数据。

Nio2Endpoint

我们先通过一张图来看看 Nio2Endpoint 有哪些组件。

image-20220112154312556

从图上看,总体工作流程跟 NioEndpoint 是相似的。

LimitLatch 是连接控制器,它负责控制最大连接数。

Nio2Acceptor 扩展了 Acceptor,用异步 I/O 的方式来接收连接,跑在一个单独的线程里,也是一个线程组。Nio2Acceptor 接收新的连接后,得到一个 AsynchronousSocketChannel,Nio2Acceptor 把 AsynchronousSocketChannel 封装成一个 Nio2SocketWrapper,并创建一个 SocketProcessor 任务类交给线程池处理,并且 SocketProcessor 持有 Nio2SocketWrapper 对象。

Executor 在执行 SocketProcessor 时,SocketProcessor 的 run 方法会调用 Http11Processor 来处理请求,Http11Processor 会通过 Nio2SocketWrapper 读取和解析请求数据,请求经过容器处理后,再把响应通过 Nio2SocketWrapper 写出。

需要你注意 Nio2Endpoint 跟 NioEndpoint 的一个明显不同点是,Nio2Endpoint 中没有 Poller 组件,也就是没有 Selector。这是为什么呢?因为在异步 I/O 模式下,Selector 的工作交给内核来做了。

Nio2Acceptor

和 NioEndpint 一样,Nio2Endpoint 的基本思路是用 LimitLatch 组件来控制连接数,但是 Nio2Acceptor 的监听连接的过程不是在一个死循环里不断的调 accept 方法,而是通过回调函数来完成的。我们来看看它的连接监听方法:

serverSock.accept(null, this);

其实就是调用了 accept 方法,注意它的第二个参数是 this,表明 Nio2Acceptor 自己就是处理连接的回调类,因此 Nio2Acceptor 实现了 CompletionHandler 接口。那么它是如何实现 CompletionHandler 接口的呢?

protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
    implements CompletionHandler<AsynchronousSocketChannel, Void> {
    
@Override
public void completed(AsynchronousSocketChannel socket,
        Void attachment) {
        
    if (isRunning() && !isPaused()) {
        if (getMaxConnections() == -1) {
            // 如果没有连接限制,继续接收新的连接
            serverSock.accept(null, this);
        } else {
            // 如果有连接限制,就在线程池里跑 Run 方法,Run 方法会检查连接数
            getExecutor().execute(this);
        }
        // 处理请求
        if (!setSocketOptions(socket)) {
            closeSocket(socket);
        }
    } 
}

可以看到 CompletionHandler 的两个模板参数分别是 AsynchronousServerSocketChannel 和 Void,我在前面说过第一个参数就是 accept 方法的返回值,第二个参数是附件类,由用户自己决定,这里为 Void。completed 方法的处理逻辑比较简单:

  • 如果没有连接限制,继续在本线程中调用 accept 方法接收新的连接。
  • 如果有连接限制,就在线程池里跑 run 方法去接收新的连接。那为什么要跑 run 方法呢,因为在 run 方法里会检查连接数,当连接达到最大数时,线程可能会被 LimitLatch 阻塞。为什么要放在线程池里跑呢?这是因为如果放在当前线程里执行,completed 方法可能被阻塞,会导致这个回调方法一直不返回。

接着 completed 方法会调用 setSocketOptions 方法,在这个方法里,会创建 Nio2SocketWrapper 和 SocketProcessor,并交给线程池处理。

Nio2SocketWrapper

Nio2SocketWrapper 的主要作用是封装 Channel,并提供接口给 Http11Processor 读写数据。讲到这里你是不是有个疑问:Http11Processor 是不能阻塞等待数据的,按照异步 I/O 的套路,Http11Processor 在调用 Nio2SocketWrapper 的 read 方法时需要注册回调类,read 调用会立即返回,问题是立即返回后 Http11Processor 还没有读到数据, 怎么办呢?这个请求的处理不就失败了吗?

为了解决这个问题,Http11Processor 是通过 2 次 read 调用来完成数据读取操作的。

  • 第一次 read 调用:连接刚刚建立好后,Acceptor 创建 SocketProcessor 任务类交给线程池去处理,Http11Processor 在处理请求的过程中,会调用 Nio2SocketWrapper 的 read 方法发出第一次读请求,同时注册了回调类 readCompletionHandler,因为数据没读到,Http11Processor 把当前的 Nio2SocketWrapper 标记为数据不完整。接着 SocketProcessor 线程被回收,Http11Processor 并没有阻塞等待数据。这里请注意,Http11Processor 维护了一个 Nio2SocketWrapper 列表,也就是维护了连接的状态。
  • 第二次 read 调用:当数据到达后,内核已经把数据拷贝到 Http11Processor 指定的 Buffer 里,同时回调类 readCompletionHandler 被调用,在这个回调处理方法里会重新创建一个新的 SocketProcessor 任务来继续处理这个连接,而这个新的 SocketProcessor 任务类持有原来那个 Nio2SocketWrapper,这一次 Http11Processor 可以通过 Nio2SocketWrapper 读取数据了,因为数据已经到了应用层的 Buffer。

这个回调类 readCompletionHandler 的源码如下,最关键的一点是,Nio2SocketWrapper 是作为附件类来传递的,这样在回调函数里能拿到所有的上下文。

this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
    public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
        ...
        // 通过附件类 SocketWrapper 拿到所有的上下文
        Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
    }
 
    public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
        ...
    }
}

内核如何阻塞与唤醒进程

进程和线程

我们先从 Linux 的进程谈起,操作系统要运行一个可执行程序,首先要将程序文件加载到内存,然后 CPU 去读取和执行程序指令,而一个进程就是“一次程序的运行过程”,内核会给每一个进程创建一个名为task_struct的数据结构,而内核也是一段程序,系统启动时就被加载到内存中了。

进程在运行过程中要访问内存,而物理内存是有限的,比如 16GB,那怎么把有限的内存分给不同的进程使用呢?跟 CPU 的分时共享一样,内存也是共享的,Linux 给每个进程虚拟出一块很大的地址空间,比如 32 位机器上进程的虚拟内存地址空间是 4GB,从 0x00000000 到 0xFFFFFFFF。但这 4GB 并不是真实的物理内存,而是进程访问到了某个虚拟地址,如果这个地址还没有对应的物理内存页,就会产生缺页中断,分配物理内存,MMU(内存管理单元)会将虚拟地址与物理内存页的映射关系保存在页表中,再次访问这个虚拟地址,就能找到相应的物理内存页。每个进程的这 4GB 虚拟地址空间分布如下图所示:

image-20220112182531999

进程的虚拟地址空间总体分为用户空间和内核空间,低地址上的 3GB 属于用户空间,高地址的 1GB 是内核空间,这是基于安全上的考虑,用户程序只能访问用户空间,内核程序可以访问整个进程空间,并且只有内核可以直接访问各种硬件资源,比如磁盘和网卡。那用户程序需要访问这些硬件资源该怎么办呢?答案是通过系统调用,系统调用可以理解为内核实现的函数,比如应用程序要通过网卡接收数据,会调用 Socket 的 read 函数:

ssize_t read(int fd,void *buf,size_t nbyte)

CPU 在执行系统调用的过程中会从用户态切换到内核态,CPU 在用户态下执行用户程序,使用的是用户空间的栈,访问用户空间的内存;当 CPU 切换到内核态后,执行内核代码,使用的是内核空间上的栈。

从上面这张图我们看到,用户空间从低到高依次是代码区、数据区、堆、共享库与 mmap 内存映射区、栈、环境变量。其中堆向高地址增长,栈向低地址增长。

请注意用户空间上还有一个共享库和 mmap 映射区,Linux 提供了内存映射函数 mmap, 它可将文件内容映射到这个内存区域,用户通过读写这段内存,从而实现对文件的读取和修改,无需通过 read/write 系统调用来读写文件,省去了用户空间和内核空间之间的数据拷贝,Java 的 MappedByteBuffer 就是通过它来实现的;用户程序用到的系统共享库也是通过 mmap 映射到了这个区域。

我在开始提到的task_struct结构体本身是分配在内核空间,它的vm_struct成员变量保存了各内存区域的起始和终止地址,此外task_struct中还保存了进程的其他信息,比如进程号、打开的文件、创建的 Socket 以及 CPU 运行上下文等。

在 Linux 中,线程是一个轻量级的进程,轻量级说的是线程只是一个 CPU 调度单元,因此线程有自己的task_struct结构体和运行栈区,但是线程的其他资源都是跟父进程共用的,比如虚拟地址空间、打开的文件和 Socket 等。

阻塞与唤醒

我们知道当用户线程发起一个阻塞式的 read 调用,数据未就绪时,线程就会阻塞,那阻塞具体是如何实现的呢?

Linux 内核将线程当作一个进程进行 CPU 调度,内核维护了一个可运行的进程队列,所有处于TASK_RUNNING状态的进程都会被放入运行队列中,本质是用双向链表将task_struct链接起来,排队使用 CPU 时间片,时间片用完重新调度 CPU。所谓调度就是在可运行进程列表中选择一个进程,再从 CPU 列表中选择一个可用的 CPU,将进程的上下文恢复到这个 CPU 的寄存器中,然后执行进程上下文指定的下一条指令。

image-20220112182622193

而阻塞的本质就是将进程的task_struct移出运行队列,添加到等待队列,并且将进程的状态的置为TASK_UNINTERRUPTIBLE或者TASK_INTERRUPTIBLE,重新触发一次 CPU 调度让出 CPU。

那线程怎么唤醒呢?线程在加入到等待队列的同时向内核注册了一个回调函数,告诉内核我在等待这个 Socket 上的数据,如果数据到了就唤醒我。这样当网卡接收到数据时,产生硬件中断,内核再通过调用回调函数唤醒进程。唤醒的过程就是将进程的task_struct从等待队列移到运行队列,并且将task_struct的状态置为TASK_RUNNING,这样进程就有机会重新获得 CPU 时间片。

这个过程中,内核还会将数据从内核空间拷贝到用户空间的堆上。

image-20220112182645449

当 read 系统调用返回时,CPU 又从内核态切换到用户态,继续执行 read 调用的下一行代码,并且能从用户空间上的 Buffer 读到数据了。