Java IO

Java IO

BIO

ServerSocket 示例

{
 ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池

 ServerSocket serverSocket = new ServerSocket();
 serverSocket.bind(8088);
 while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
    Socket socket = serverSocket.accept();
    executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
}

class ConnectIOnHandler extends Thread{
    private Socket socket;
    public ConnectIOnHandler(Socket socket){
       this.socket = socket;
    }
    public void run(){
      while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
          String someThing = socket.read()....//读取数据
          if(someThing!=null){
             ......//处理数据
             socket.write()....//写数据
          }

      }
    }
}

这个模型最本质的问题在于,严重依赖于线程。但线程是很”贵”的资源,主要表现在:

    1. 线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。
    1. 线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。
    1. 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
    1. 容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。

同步调用示例

服务端响应之前,IO 会阻塞在:java.net.SocketInputStream#socketRead0native 方法上:

通过 jstack 日志,可以发现,此时这个 Thread 会一直在 runable 的状态:

"main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000]   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)

线程模型示例:

同步最大的问题是在IO等待的过程中,线程资源没有得到充分的利用,对于大量IO场景的业务吞吐量会有一定限制

性能问题

I/O 操作分为磁盘 I/O 操作和网络 I/O 操作。前者是从磁盘中读取数据源输入到内存中,之后将读取的信息持久化输出在物理磁盘上;后者是从网络中读取信息输入到内存,最终将信息输出到网络中。但不管是磁盘 I/O 还是网络 I/O,在传统 I/O 中都存在严重的性能问题。

(1) 多次内存复制

在传统 I/O 中,我们可以通过 InputStream 从源数据中读取数据流输入到缓冲区里,通过 OutputStream 将数据输出到外部设备(包括磁盘、网络)。你可以先看下输入操作在操作系统中的具体流程,如下图所示:

  • JVM 会发出 read() 系统调用,并通过 read 系统调用向内核发起读请求;
  • 内核向硬件发送读指令,并等待读就绪;
  • 内核把将要读取的数据复制到指向的内核缓存中;
  • 操作系统内核将数据复制到用户空间缓冲区,然后 read 系统调用返回。

在这个过程中,数据先从外部设备复制到内核空间,再从内核空间复制到用户空间,这就发生了两次内存复制操作。这种操作会导致不必要的数据拷贝和上下文切换,从而降低 I/O 的性能。

(2) 阻塞

在传统 I/O 中,InputStreamread() 是一个 while 循环操作,它会一直等待数据读取,直到数据就绪才会返回。这就意味着如果没有数据就绪,这个读取操作将会一直被挂起,用户线程将会处于阻塞状态。

在少量连接请求的情况下,使用这种方式没有问题,响应速度也很高。但在发生大量连接请求时,就需要创建大量监听线程,这时如果线程没有数据就绪就会被挂起,然后进入阻塞状态。一旦发生线程阻塞,这些线程将会不断地抢夺 CPU 资源,从而导致大量的 CPU 上下文切换,增加系统的性能开销

NIO

对 IO 进行优化

(1) 使用缓冲区优化读写流操作

NIO 与传统 I/O 不同,它是基于块(Block)的,它以块为基本单位处理数据。在 NIO 中,最为重要的两个组件是缓冲区(Buffer)和通道(Channel)。Buffer 是一块连续的内存块,是 NIO 读写数据的中转地。Channel 表示缓冲数据的源头或者目的地,它用于读取缓冲或者写入数据,是访问缓冲的接口。

传统 I/O 和 NIO 的最大区别就是传统 I/O 是面向流,NIO 是面向 Buffer。Buffer 可以将文件一次性读入内存再做后续处理,而传统的方式是边读文件边处理数据。虽然传统 I/O 后面也使用了缓冲块,例如 BufferedInputStream,但仍然不能和 NIO 相媲美。使用 NIO 替代传统 I/O 操作,可以提升系统的整体性能,效果立竿见影。

(2) 使用 DirectBuffer 减少内存复制

NIO 的 Buffer 除了做了缓冲块优化之外,还提供了一个可以直接访问物理内存的类 DirectBuffer。普通的 Buffer 分配的是 JVM 堆内存,而 DirectBuffer 是直接分配物理内存 (非堆内存)。

我们知道数据要输出到外部设备,必须先从用户空间复制到内核空间,再复制到输出设备,而在 Java 中,在用户空间中又存在一个拷贝,那就是从 Java 堆内存中拷贝到临时的直接内存中,通过临时的直接内存拷贝到内存空间中去。此时的直接内存和堆内存都是属于用户空间。

你肯定会在想,为什么 Java 需要通过一个临时的非堆内存来复制数据呢?如果单纯使用 Java 堆内存进行数据拷贝,当拷贝的数据量比较大的情况下,Java 堆的 GC 压力会比较大,而使用非堆内存可以减低 GC 的压力。

DirectBuffer 则是直接将步骤简化为数据直接保存到非堆内存,从而减少了一次数据拷贝。以下是 JDK 源码中 IOUtil.java 类中的 write 方法:

if (src instanceof DirectBuffer)
    return writeFromNativeBuffer(fd, src, position, nd);

// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0); 
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
    bb.put(src);
    bb.flip();
// ...............

这里拓展一点,由于 DirectBuffer 申请的是非 JVM 的物理内存,所以创建和销毁的代价很高。DirectBuffer 申请的内存并不是直接由 JVM 负责垃圾回收,但在 DirectBuffer 包装类被回收时,会通过 Java Reference 机制来释放该内存块。

DirectBuffer 只优化了用户空间内部的拷贝,而之前我们是说优化用户空间和内核空间的拷贝,那 Java 的 NIO 中是否能做到减少用户空间和内核空间的拷贝优化呢?

答案是可以的,DirectBuffer 是通过 unsafe.allocateMemory(size) 方法分配内存,也就是基于本地类 Unsafe 类调用 native 方法进行内存分配的。而在 NIO 中,还存在另外一个 Buffer 类:MappedByteBuffer,跟 DirectBuffer 不同的是,MappedByteBuffer 是通过本地类调用 mmap 进行文件内存映射的,map() 系统调用方法会直接将文件从硬盘拷贝到用户空间,只进行一次数据拷贝,从而减少了传统的 read() 方法从硬盘拷贝到内核空间这一步。

(3) 避免阻塞,优化 I/O 操作

NIO 很多人也称之为 Non-block I/O,即非阻塞 I/O,因为这样叫,更能体现它的特点。为什么这么说呢?

传统的 I/O 即使使用了缓冲块,依然存在阻塞问题。由于线程池线程数量有限,一旦发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中有空闲的线程可以被复用。而对 Socket 的输入流进行读取时,读取流会一直阻塞,直到发生以下三种情况的任意一种才会解除阻塞:

  • 有数据可读;
  • 连接释放;
  • 空指针或 I/O 异常。

阻塞问题,就是传统 I/O 最大的弊端。NIO 发布后,通道和多路复用器这两个基本组件实现了 NIO 的非阻塞,下面我们就一起来了解下这两个组件的优化原理。

零拷贝

在 I/O 复用模型中,执行读写 I/O 操作依然是阻塞的,在执行读写 I/O 操作时,存在着多次内存拷贝和上下文切换,给系统增加了性能开销。

零拷贝是一种避免多次内存复制的技术,用来优化读写 I/O 操作。

在网络编程中,通常由 read、write 来完成一次 I/O 读写操作。每一次 I/O 读写操作都需要完成四次内存拷贝,路径是 I/O 设备 -> 内核空间 -> 用户空间 -> 内核空间 -> 其它 I/O 设备。

Linux 内核中的 mmap 函数可以代替 read、write 的 I/O 读写操作,实现用户空间和内核空间共享一个缓存数据。mmap 将用户空间的一块地址和内核空间的一块地址同时映射到相同的一块物理内存地址,不管是用户空间还是内核空间都是虚拟地址,最终要通过地址映射映射到物理内存地址。这种方式避免了内核空间与用户空间的数据交换。I/O 复用中的 epoll 函数中就是使用了 mmap 减少了内存拷贝。

在 Java 的 NIO 编程中,则是使用到了 Direct Buffer 来实现内存的零拷贝。Java 直接在 JVM 内存空间之外开辟了一个物理内存空间,这样内核和用户进程都能共享一份缓存数据。

服务器提供文件传输功能,需要将磁盘上的文件读取出来,通过网络协议发送到客户端。

通常,你会选择最直接的方法:从网络请求中找出文件在磁盘中的路径后,如果这个文件比较大,假设有 320MB,可以在内存中分配 32KB 的缓冲区,再把文件分成一万份,每份只有 32KB,这样,从文件的起始位置读入 32KB 到缓冲区,再通过网络 API 把这 32KB 发送到客户端。接着重复一万次,直到把完整的文件都发送完毕。

  • 它至少经历了 4 万次用户态与内核态的上下文切换,因为每处理 32KB 的消息,就需要一次 read 调用和一次 write 调用,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。可见,每处理 32KB,就有 4 次上下文切换,重复 1 万次后就有 4 万次切换。
  • 这个方案做了 4 万次内存拷贝,对 320MB 文件拷贝的字节数也翻了 4 倍,到了1280MB。很显然,过多的内存拷贝无谓地消耗了 CPU 资源,降低了系统的并发处理能力。

零拷贝是操作系统提供的新函数。同时接收文件描述符和 TCP socket 作为输入参数,这样执行时就可以完全在内核态完成内存拷贝,既减少了内存拷贝次数,也降低了上下文切换次数。

Channels

打开 SocketChannel

SocketChannel ch = SocketChannel.open();  
ch.connect(new InetSocketAddress("somehost", someport));  

Server 监听在某个端口上:

ServerSocketChannel ch = ServerSocketChannel.open();  
ch.socket().bind (new InetSocketAddress (somelocalport));  

最开始,在应用程序调用操作系统 I/O 接口时,是由 CPU 完成分配,这种方式最大的问题是“发生大量 I/O 请求时,非常消耗 CPU“;之后,操作系统引入了 DMA(直接存储器存储),内核空间与磁盘之间的存取完全由 DMA 负责,但这种方式依然需要向 CPU 申请权限,且需要借助 DMA 总线来完成数据的复制操作,如果 DMA 总线过多,就会造成总线冲突。

通道的出现解决了以上问题,Channel 有自己的处理器,可以完成内核空间和磁盘之间的 I/O 操作。在 NIO 中,我们读取和写入数据都要通过 Channel,由于 Channel 是双向的,所以读、写可以同时进行。

Buffers

Selectors

Selector 是 Java NIO 编程的基础。用于检查一个或多个 NIO Channel 的状态是否处于可读、可写。

Selector 是基于事件驱动实现的,我们可以在 Selector 中注册 accpet、read 监听事件,Selector 会不断轮询注册在其上的 Channel,如果某个 Channel 上面发生监听事件,这个 Channel 就处于就绪状态,然后进行 I/O 操作。

一个线程使用一个 Selector,通过轮询的方式,可以监听多个 Channel 上的事件。我们可以在注册 Channel 时设置该通道为非阻塞,当 Channel 上没有 I/O 操作时,该线程就不会一直等待了,而是会不断轮询所有 Channel,从而避免发生阻塞。

目前操作系统的 I/O 多路复用机制都使用了 epoll,相比传统的 select 机制,epoll 没有最大连接句柄 1024 的限制。所以 Selector 在理论上可以轮询成千上万的客户端。

NIO 编程示例

下面具体看下如何利用事件模型单线程处理所有I/O请求:

NIO 的主要事件有几个:读就绪、写就绪、有新连接到来

我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是selectpoll,2.6之后是epoll,Windows是IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(selectpoll),这个函数是阻塞的。所以你可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。

interface ChannelHandler{
    void channelReadable(Channel channel);
    void channelWritable(Channel channel);
}
class Channel{
    Socket socket;
    Event event;//读,写或者连接
}

//IO线程主循环:
class IoThread extends Thread{

    @Override
    public void run() {
        Channel channel;
        while(channel=Selector.select()){//选择就绪的事件和对应的连接
            if(channel.event==accept){
                registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
            }
            if(channel.event==write){
                getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
            }
            if(channel.event==read){
                getChannelHandler(channel).channelReadable(channel);//如果可以读,则执行读事件
            }
        }
    }
    Map<ChannelChannelHandler> handlerMap;//所有channel的对应事件处理器
}

Future 调用

(1) JDK NIO & Future

这样做的好处是,主线程可以不用等待IO响应,可以去做点其他的,比如说再发送一个IO请求,可以等到一起返回:

"main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000]   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)

主线程在等待结果返回过程中依然需要等待,没有根本解决此问题。

(2) 使用 Callback 回调方式

可不可以在主线程完成发送请求后,再也不用关心这个逻辑,去执行其他的逻辑?那就可以使用 Callback 机制。

如此一来,主线程再也不需要关心发起IO后的业务逻辑,发送完请求后,就可以彻底去干其他事情,或者回到线程池中再供调度。如果是HttpServer,那么需要结合Servlet 3.1的异步Servlet

使用Callback方式,从线程模型中看,发现线程资源已经得到了比较充分的利用,整个过程中已经没有线程阻塞。

(3) JDK 1.8 CompletableFuture

回调地狱,当Callback的线程还需要执行下一个IO调用的时候,这个时候进入回调地狱模式。

典型的应用场景如,通过经纬度获取行政区域adcode(逆地理接口),然后再根据获得的adcode,获取当地的天气信息(天气接口)。

那么有没有办法解决Callback Hell的问题?当然有,JDK 1.8中提供了CompletableFuture,先看看它是怎么解决这个问题的。

将逆地理的Callback逻辑,封装成一个独立的CompletableFuture,当异步线程回调时,调用 future.complete(T) ,将结果封装。

将天气执行的Call逻辑,也封装成为一个独立的CompletableFuture ,完成之后,逻辑同上。

compose衔接,whenComplete输出:

每一个IO操作,均可以封装为独立的CompletableFuture,从而避免回调地狱。

select

select() 函数:它的用途是,在超时时间内,监听用户感兴趣的文件描述符上的可读可写和异常事件的发生。Linux 操作系统的内核将所有外部设备都看做一个文件来操作,对一个文件的读写操作会调用内核提供的系统命令,返回一个文件描述符(fd)。

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)

select() 函数监视的文件描述符分 3 类,分别是 writefds(写文件描述符)、readfds(读文件描述符)以及 exceptfds(异常事件文件描述符)。

调用后 select() 函数会阻塞,直到有描述符就绪或者超时,函数返回。当 select 函数返回后,可以通过函数 FD_ISSET 遍历 fdset,来找到就绪的描述符。fd_set 可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:

void FD_ZERO(fd_set *fdset);           //清空集合
void FD_SET(int fd, fd_set *fdset);   //将一个给定的文件描述符加入集合之中
void FD_CLR(int fd, fd_set *fdset);   //将一个给定的文件描述符从集合中删除
int FD_ISSET(int fd, fd_set *fdset);   // 检查集合中指定的文件描述符是否可以读写 

select 不断轮循去监听 socket,socket 个数有限制,一般是 1024/2048FD_SETSIZE 控制,需要重新编译内核才可以修改宏。线性扫描全部 socket 集合。

select 缺点:每次调用 select 都需要把 fd 集合从用户态拷贝到内核态epoll 对于每个文件描述符只会拷贝一次到内核,不用重复拷贝。

poll

poll() 函数:在每次调用 select() 函数之前,系统需要把一个 fd 从用户态拷贝到内核态,这样就给系统带来了一定的性能开销。再有单个进程监视的 fd 数量默认是 1024,我们可以通过修改宏定义甚至重新编译内核的方式打破这一限制。但由于 fd_set 是基于数组实现的,在新增和删除 fd 时,数量过大会导致效率降低。

poll() 的机制与 select() 类似,二者在本质上差别不大。poll() 管理多个描述符也是通过轮询,根据描述符的状态进行处理,但 poll() 没有最大文件描述符数量的限制。

poll() 和 select() 存在一个相同的缺点,那就是包含大量文件描述符的数组被整体复制到用户态和内核的地址空间之间,而无论这些文件描述符是否就绪,他们的开销都会随着文件描述符数量的增加而线性增大。

poll 轮循方式监听,只不过没有最大连接数的限制。线性扫描

epoll

epoll 不是轮循监听,而是 socket 有变化时通过回调的方式主动告知用户进程。

epoll 缺点:

  • 大部分套接字活跃的情况下,性能比不上 select/poll。它更适合处理大量的 fd,且活跃 fd 不是很多的情,反之如果处理的 fd 量不大,且基本都是活跃的,过多使用 epoll_ctl,效率相比还有稍微的下降。
  • 跨平台性差

epoll 原理:

epoll 由 epoll_createepoll_ctlepoll_wait 这三个底层函数支持,使用的时候用 epoll_create 创建对象,参数 site 是保证内核保证能正确处理的最大句柄数。epoll_ctl 可以添加/移除 socket。epoll_wait 在给定的 timeout 时间内,如果有事件发生,那么就返回变化的文件句柄。

// 参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。
int epoll_create(int size);  
// 将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等(也就是将I/O流放到内核)。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
// 在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程(也就是在内核层面捕获可读写的I/O事件)。
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

epoll 维护了一棵红黑树来跟踪所有待检测的文件描述字,黑红树的使用减少了内核和用户空间大量的数据拷贝和内存分配,大大提高了性能。

同时,epoll 维护了一个链表来记录就绪事件,内核在每个文件有事件发生时将自己登记到这个就绪事件列表中,通过内核自身的文件 file-eventpoll 之间的回调和唤醒机制,减少了对内核描述字的遍历,大大加速了事件通知和检测的效率,这也为 level-triggered 和 edge-triggered 的实现带来了便利。

另外 epoll 有 ET 和 LT 两种触发模式,LT 的话,只要一个句柄上的事件没有处理完,以后调用 epoll_wait 还会返回这个句柄,而 ET 仅仅在第一次返回。

epoll 使用流程:

int epoll_fd = epoll_create(0);
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, 0, &event);
while(running) 
    event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, 30000);

epoll 线程安全

它是线程安全的, but there isn’t much documentation that explicitly states that. See here.

BTW, you can also have multiple threads waiting on a single epoll_fd, but in that case it can get a bit tricky. (I.e. you might want to use edge-triggered EPOLLET or oneshot mode EPOLLONESHOT. See here.)

参考

epoll vs poll/select

首先,poll/select 先将要监听的 fd 从用户空间拷贝到内核空间, 然后在内核空间里面进行处理之后,再拷贝给用户空间。这里就涉及到内核空间申请内存 (select 是先尝试申请栈上资源, 如果需要监听的 fd 比较多, 就会去申请堆空间的资源),释放内存等等过程,这在大量 fd 情况下,是非常耗时的。而 epoll 维护了一个红黑树,通过对这棵黑红树进行操作,可以避免大量的内存申请和释放的操作,而且查找速度非常快。

第二,select/poll 从休眠中被唤醒时,如果监听多个 fd,只要其中有一个 fd 有事件发生,内核就会遍历内部的 list 去检查到底是哪一个事件到达,并没有像 epoll 一样, 通过 fd 直接关联 eventpoll 对象,快速地把 fd 直接加入到 eventpoll 的就绪列表中。

NIO2(AIO)

在Linux中,AIO并未真正使用操作系统所提供的异步I/O,它仍然使用poll或epoll,并将API封装为异步I/O的样子,但是其本质仍然是同步非阻塞I/O,加上第三方产品的出现,Java网络编程明显落后,所以没有成为主流。

参考