IO 与 NIO(二)- FileChannel、SocketChannel、ServerSocketChannel、Non-blocking Server

3 分钟读完

[TOC]

Java 的 IO 与 NIO - FileChannel、SocketChannel、ServerSocketChannel、Non-blocking Server

FileChannel

FileChannel 是连接到文件的 channel,无法被设置为非阻塞模式,它总是运行在阻塞模式

Opening an FileChannel

需要通过 InputStream、OutputStream 或者 RandomAccessFile 来获取 FileChannel,例如:

RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

Reading Data from a FileChannel

从 FileChannel 中读取数据,示例:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

先分配一个 ByteBuffer。数据是从 FileChannel 读取到 Buffer 的。

然后调用 FileChannel.read() 方法,将数据从 channel 读取到 buffer。read() 方法返回的 int 指明有多少个字节被写入到 Buffer,返回 -1 则表示已经到达文件结尾。

Writing Data to a FileChannel

将数据写入到 FileChannel,示例:

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hanRemaining()){
	channel.write(buf);
}

其中 FileChannel.write() 方法是在 while 循环中中调用的。因为无法知道有多少个字节会被写入大 FileChannel,所以重复调用 write() 方法知道 buffer 中无更多的字节要被写入。

Closing a FileChannel

使用完 FileChannel 之后必须关闭

channel.close()

FileChannel Position

读写 FileChannel 都是在一个指定的 position 上完成的。可以通过 FileChannel.position() 方法获取 position

同样可以通过这个方法来指定 position

long pos = channel.position();
channel.position(pos + 123);

读 channel 的时候,如果 position 超过了文件,则返回 -1。

写 channel 的时候,如果 position 超过了文件,则扩展到 positon 的大小位置,但是这样会导致物理文件在磁盘上的空隙。

FileChannel Size

channel.size()

FileChannel Truncate

channel.truncate(1024);

FileChannel Force

FileChannel.force() 方法将所有未写入的数据写到磁盘上。其中布尔参数表名是否保存文件元数据(权限等)

SocketChannel

SocketChannel 是连接到 TCP 网络 socket 的 channel。有两种创建的方式

  1. 打开一个 SocketChannel 然后连接到网络上的某个服务
  2. 当连接到达一个 ServerSocketChannel 的时候创建 SocketChannel

Opening a SocketChannel

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://www.baidu.com",80));

Closing a SocketChannel

使用完毕后需要关闭

socketChannel.close();

Reading from a SocketChannel

ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = socketChannel.read(buf);

Writing to a SocketChannel

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    channel.write(buf);
}

Non-blocking Mode

将 SocketChannel 设为非阻塞模式,则可以在异步模式下调用 connect()、read()、write() 方法

connect()

非阻塞模式下可以调用 connect() 方法,方法可能在连接建立之前就返回了,需要调用 finishConnect() 方法来查看连接是否被建立。

socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://baidu.com", 80));

while(! socketChannel.finishConnect() ){
    //wait, or do something else...    
}
write()

方法可能在写入任何数据之前就返回了,因此需要在循环中调用 write() 方法。参见上一个写数据到 SocketChannel 的代码

read()

方法可能在没有读取数据之前就返回了,因此需要在循环中调用 read() 方法。需要注意返回的 int 值,其表示的是读取到的字节的数量。

Non-blocking Mode with Selectors

非阻塞模式下 SocketChannel 和 Selector 一起工作时会更有效率。

ServerSocketChannel

Java NIO ServerSockerChannel 可以监听 TCP 连接,和标准 Java 网络中的 ServerSocket 相同。示例:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));

while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();

    //do something with socketChannel...
}

Opening a ServerSocketChannel

调用 ServerSocketChannel.open() 方法来打开一个 ServerSocketChannel

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

Closing a ServerSocketChannel

调用 serverSocketChannel.close() 关闭 ServerSocketChannel

Listening for Incoming Connections

监听接入的连接,可以调用 ServerSocketChannel.accept() 方法来实现。accept() 方法会阻塞到一个新的连接进来,方法会返回一个带有接入连接的 SocketChannel

可以使用 while 循环来一直监听新的连接事件

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    //do something with socketChannel...
}

其中可以使用一些停止条件代替 true

Non-blocking Mode

非阻塞模式中,accept() 方法会立即返回,如果没有连接接入的话,可能会返回 null。因此需要判断返回的 SocketChannel 是否为 null

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    if(socketChannel != null){
        //do something with socketChannel...
        }
}

Non-blocking Server

非阻塞服务是很难设计好的,即使知道了它的特征是在 Selector、Channel、Buffer,想设计好仍然会是巨大的挑战。

Non-blocking IO Pipelines

non-blocking IO pipeline 是一系列处理非阻塞 IO 的组件链,包括在非阻塞 fashion 上对 IO 的读写。一个简单的链条如下

Channel -> Selector -> Component -> Channel

组件使用 Selector 来检查 Channel 是否有可读数据。有的话读取数据并根据数据进行输出,写入到下一个 Channel。

非阻塞 IO pipeline 不需要既读又写,可以只读数据,或者只写数据

可以有多个组件,根据 pipeline 的需求来决定非阻塞 IO pipeline 的长度

可以从多个 Channel 同时读取数据。例如从多个 SocketChannel 读取数据

控制权在于组件,它决定是否从 Channel 读取数据和输出数据

Non-blocking vs. Blocking IO Pipelines

两者最大的区别在于数据是如何从下层的 Channel 读取数据的。

IO pipeline 通常从来自 socket 或者文件 stream 读取数据,并且将数据分割成连续的信息体

Stream -> Message Reader -> Message -> Message -> Message

阻塞 IO 使用 InputStream 类似的接口,每次从下层 Channel 读取 一个字节,接口被阻塞直到能有数据读取。

阻塞 IO 接口简化了信息读取实现类,不需要考虑没有读取到数据或者只读取了部分的情况。同样,阻塞 IO 接口减缓了信息写入实现类,不需要考虑只有部分数据被写入。

Blocking IO Pipeline Drawbacks

阻塞 IO pipeline 的缺点,每个 stream 都需要一个线程来分隔成信息,会导致内存占用过高。

可以使用线程池来降低线程数量。但是大量不活跃连接的线程也会阻塞整个线程池。

Basic Non-blocking IO Pipeline Design

非阻塞 IO pipeline 可以使用单个线程来读取多个 stream 的数据。这要求 stream 可以切换到非阻塞模式。此时 stream 可以返回需要读取的 0 个或更多个字节的信息

使用 Java NIO Selector 来避免 0 字节的 stream。一个或多个 SelectableChannel 注册到 Selector 上。调用 select() 或者 selectNow() 只会返回实际有数据的 SelectableChannel

Reading Partial Messages

从 SelectableChannel 中读取的数据不能确定是不是完整或者多出来的。

处理部分信息方面有两个问题

  1. 检测数据块中是否有完整的一条信息
  2. 在信息的剩余部分到达前应该怎么处理这部分信息

检测是否是含有完整数据,要非常快才行。

需要将这部分信息存储起来知道剩余信息到达

检测和存储这部分信息都由信息处理类来做。为避免混淆从不同 Channel 来的消息数据,可以对每个 Channel 使用一个 Message Reader

检索到来自 Selector 的 Channel 实例后,Message Reader 关联 Channel 并试图将数据弄成小的消息。

Storing Partial Messages

使用 Message Reader 来存储不完整的信息,有两种设计考量

  1. 尽可能少的去复制信息,会影响性能
  2. 完整的信息尽可能存放在连续的字节序列中,这样解析起来简单
A Buffer Per Message Reader

部分信息需要存放在某种 buffer 中,它的实现存在于 Message Reader 内部。buffer 可存放的最大字节是多少?大于最大消息体?会导致占用内存过高

Resizable Buffers

另一个选项是实现一个动态调节的 buffer。开始的时候小,可随着消息体的大小而变化。有多实现思路

Resize by Copy

开始的时候分配 4kb,如果消息无法放入 4KB buffer,再分配一个 8KB buffer,将数据从 4KB buffer 复制到大些的 Buffer 中

好处是所有信息都是在一个连续的字节数组中。坏处是对于大消息来说会有很多的数据复制发生。

减少复制的方法是分析系统中信息大小找出能减少复制的 buffer size,层层递进,如:

4KB、128KB、maximum message size。

消息处理完之后要清空分配的内存,下个接收到数据仍然从最小 buffer size 开始。

Resize by Append

另一个方式是让多个数组组成 buffer。想要调整 buffer,只需要分配另一个字节数组然后将数据写入此数组。

两种方法。一是分配不同的字节数组,将它们放在一个 list 中。另一个是分配分开的大量的可分享的字节数组,然后将它们放在一个分隔的分配到 buffer中。

有点是在使用 appending 分隔的数组或者切割 buffer 过程中,在写入的过程中没有复制。所有的数据可以直接从一个 socket (channel ) 复制到切片或数组中

增长是式 buffer 的缺点在于数据没有存储在一个单个的连续的数组。这让信息解析起来更困难,需要解析器同时去查找每个单独数组的尾部和所有数组的结尾。由于需要查找信息的结束部分,这个模型不怎么好实现。

TLV Encoded Messages

一些协议的信息格式编码为 TLV 格式,即 类型(TYPE)、长度(LENGTH)、值(VALUE),这样可以知道存储整个信息需要多少分配多少内存

缺点是万一有连接很慢的大信息过来,可能会占用了所有的内存导致服务不可用

一种解决方法是使用一种存储多个 TLV 字段的信息格式。这样,内存是分配给各个字段的,而不是分配给信息的,而且内存只在有这个字段有的时候才会分配。当然,一个很大的字段还是会出现和之前一样的内存方面的问题。

另一个方法是对10~15秒仍然未全部接收到的信息进行超时处理,这会让服务器从很多同时到达的大消息体中恢复,但这仍然会让服务器在一段时间内无法响应。蓄意的 DoS 攻击也可能让服务器的内存被占满。

TLV 有许多不同的格式,也可以是长度字段在前,然后是类型字段,最后是值字段(LTV 编码)格式。

TLV 编码方式让 HTTP1.1 看起来是个很差劲的协议,在 HTTP2.0 中修复了这些问题,传输数据使用了 TLV 编码帧。

Writing Partial Messages

在非阻塞模式中,使用 write(ByteBuffer) 方法在 Channel 中写数据的话,无法保证会有多少 ByteBuffer 中的字节被写入。不过可以通过方法返回的写入数据字节个数来追踪。

可以创建一个 Message Writer,每个要写入数据的 Channel 都创建一个。这个对象内部保存追踪到的正在写入消息的字节的数量。

为了防止 Message Writer 中到达消息数量超过它可以直接写入到 Channel 中的数量,其中的消息需要排成队列,这样就可以尽快的写入消息。

为了保证能早些传输部分的数据,Message Writer 需要一直被调用。

可以使用 Selector 来监测 Channel 是否准备好写入。可以使用两个步骤来避免监测所有的 Channel 准备写入的情况

  1. 当信息被写入到 Message Writer,将其关联的 Channel 注册到 Selector
  2. 当服务有空闲时间时,监测注册到 Selector 上的 Channel 是否准备写入。每个只写的 Channel 关联的 Message Writer 被要求将数据写入到 Channel。当 Message Writer 写完了所有的数据到 Channel ,该 Channel 从 Selector 中注销掉。

Putting it All Together

非阻塞 server 需要一直监测到来的数据,确保新消息和全部消息都被接收到。也要一直监测数据的写入,防止只写入的部分数据。

总的来说非阻塞 server 以需要执行的三个 pipeline 结束

  1. 读取的 pipeline,为所有打开的连接监测到来的数据
  2. 处理 pipeline,处理所有接收到的完整的信息
  3. 写入的 pipeline,监测是否可以写入任何输出的信息到任何打开的连接

这三个 pipeline 一直在循环执行,可以优化其过程

Server Thread Model

Github Repository 上的非阻塞服务实现使用了一个带有2个线程的线程模型。第一个线程使用 ServerSocketChannel 接收到来的连接。第二个线程处理接收的连接,包括读信息,处理信息和写响应信息并返回到连接。