IO 与 NIO(一)- Channel、Buffer、Selector
[TOC]
Java 的 IO 与 NIO - Channel、Buffer、Selector
IO 模型
阻塞 IO 模型
在读写数据过程中会发生阻塞现象。当用户线程发出 IO 请求后,内核会去查看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用户线程交出 CPU。当数据就绪后,内核会将数据拷贝到用户线程,并返回结果给用户线程,用户线程才解绑 block 状态。
非阻塞 IO 模型
当用户线程发起一个read操作后,并不需要等待,而是马上就得到了一个结果。如果结果是一个Error时,它就知道数据还没准备好,于是它可以再次发送read操作。一旦内核中的数据准备好了,并且又再次受到了用户线程的请求,那么它马上就将数据拷贝到了用户线程,然后返回。所以事实上,在非阻塞IO模型中,用户线程需要不断地询问内核数据是否就绪,也就是说非阻塞IO模型不会交出CPU,而会一直占用CPU。典型的非阻塞IO模型一般如下:
while(true){
data = socket.read();
if(data != error){
处理数据
break;
}
}
但是对于非阻塞IO就有一个严重的问题,在while循环中需要不断地去询问内核数据是否就绪,这样会导致CPU占用率过高,因此一般情况下很少使用while循环这种方式来读取数据。
多路复用 IO 模型
多路复用IO模型是目前使用比较多的模型。Java NIO实际上就是多路复用IO。在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的线程或者进程,也不必维护这些线程和进程,并且只有在真正有socket读写事件进行时,才会使用IO资源,所以它大大减少了资源占用。在Java NIO中,是通过selector.select()去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在那里,因此这种方式会导致用户线程的阻塞。多路复用IO模式,通过一个线程就可以管理多个socket,只有在真正有socket读写事件进行时,才会使用IO资源进行实际的读写操作。因此,多路复用IO比较适合连接数比较多的情况。
另外多路复用IO为何比非阻塞IO模型的效率高是因为在非阻塞IO中,不断地询问socket状态时通过用户线程去进行的,而在多路复用IO中,轮询每个socket状态是内核进行的,这个效率比用户线程要高的多。
不过要注意的是,多路复用IO模型是通过轮询的方式来检测是否有事件到达,并且对到达的事件逐一进行相应。因此对于多路复用IO模型来说,一旦事件响应体很大,那么会导致后续的事件迟迟得不到处理,并且会影响新的事件轮询。
信号驱动 IO 模型
在信号驱动IO模型中,当用户线程发起一个IO请求操作,会给对应的socket注册一个信号函数,然后用户线程会继续执行,当内核数据就绪时会发送一个信号给用户线程,用户线程接收到信号之后,便在信号函数中调用IO读写操作来进行实际的IO请求操作。
异步 IO 模型
异步IO模型才是最理想的IO模型,在异步IO模型中,当用户线程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从内核的角度,当它收到一个asynchronous read之后,它会立刻返回,说明read请求一个成功发起了,因此不会对用户线程产生任何block。然后,内核会等待数据准备完成,然后将数据拷贝到用户线程,当这一切都完成之后,内核会给用户线程发送一个信号,告诉它read操作完成了。也就是说用户线程完全不需要实际的整个IO操作是如何进行的。只需要先发起一个请求,当接收内核返回的成功信号时表示IO操作已经完成,可以直接去使用数据了。
也就是说在异步IO模型中,IO操作的两个阶段都不会阻塞用户线程,这两个阶段都是由内核自动完成的。然后发送一个信号告知用户线程操作已经完成。用户线程中不需要再次调用IO函数进行具体的读写。这点是和信号驱动模型有所不同的,在信号驱动模型中,当用户线程接收到信号表示数据已经就绪,然后需要用户线程调用IO函数进行实际的读写操作;而在异步IO模型中,收到信号表示IO操作已经完成,不需要再在用户线程中调用IO函数进行实际的读写操作了。
注意:异步IO是需要操作系统的底层支持的,在java7中,提供了Asynchronous IO。
Java NIO
标准 IO API 使用字节流和字符流。NIO 使用 channels 和 buffers。数据总是从一个 channel 读取到一个 buffer,或者从一个 buffer 写入到一个 channel
非阻塞 IO,例如:一个线程可以让一个 channel 读取数据到一个 buffer,与此同时,线程可以去做别的事情。等数据读取到 buffer 之后,线程就可以继续对数据进行处理了。
selectors。selector 是一个可以监控多个 channels 事件的对象。因此,一个线程可以使用 selector 监控多个 channels 的数据。
核心 API
Java NIO 的核心 API : Channels、Buffers、Selectors
Channel 的主要实现类:
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
Buffer 的主要实现类:
-
ByteBuffer
-
CharBuffer
-
DoubleBuffer
-
FloatBuffer
-
IntBuffer
-
LongBuffer
-
ShortBuffer
MappedByteBuffer
selector 的一个使用场景是,如果程序有很多打开的连接(channel),但是每个连接的流量很少,就能很方便的使用 selector。这类似于一个聊天服务的场景。
使用 selector 之前,需要将 channels 注册到 selector 上。然后可以调用它的 select() 方法。这个方法将一直阻塞直到其中一个 channel 有事件发生。一旦方法返回,线程就可以处理这些事件。例如有新的连接进来,接收到数据等等。
Channel
channel 类似于 stream,但是有这些区别:
-
可以往一个 channel 里同时 读写 。stream 是典型的单向(读或写)
- channels 可以异步的读写
- channels 总是从一个 buffer 中读或写
实现类
FileChannel::从一个文件中读或写
DatagramChannel:通过 UDP 网络协议读写数据
SocketChannel:通过 TCP 网络协议读写数据
ServerSocketChannel:可以允许监听 TCP 连接,每个进来的连接都会相应的创建一个 SocketChannel 。
RandomAccessFile accessFile = new RandomAccessFile("wordcount/nio-data.txt", "rw");
FileChannel fileChannel = accessFile.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = fileChannel.read(buffer);
while (bytesRead != -1) {
System.out.println("read " + bytesRead);
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
buffer.clear();
bytesRead = fileChannel.read(buffer);
}
accessFile.close();
其中 buffer.flip()
就是用来反转对 Buffer 的读写操作的,刚开始是写入 buffer,然后是从 buffer 中读取。
Buffer
数据是从 channel 读取到 buffer,从 buffer 写入到 channel。
buffer 本质上是一个内存块,用于写入数据,之后再读取。这个内存块使用 NIO 的 Buffer 对象进行包装的,它提供了一系列的方法简化了对此内存块的操作。
基本的用法:
- 将数据写入到 Buffer
- 调用
buffer.flip()
方法,将写模式切换到读模式 - 从 Buffer 中读取出数据
- 调用
buffer.clear()
或buffer.compact()
方法
clear()
方法会清空 buffer 中所有的数据,compact()
方法则只清空已经读取的数据,此时未读取的数据会进入buffer 的前排,新的数据会被写入到这些数据之后。
Buffer 的三个属性
容量(capactiy)、位置(position)、限制(limit)
capcacity:内存块的固定大小。如果满了,则只能清空(将数据读取出或者清除)后再写入。
position:写模式中,position 从 0 开始指向内存块中已写入数据的位置,最大值是 capacity-1
。切换模式到读模式后,posiition 被重设为 0,读数据过程中指向下一个将要读取的数据的位置。
limit:写模式中,limit 即能写入的数据多少,与 capacity 相同。读模式中,limit 为可以读取的数据的大小,,即写模式中写入的数据的位置。
Buffer 类型
8中类型,ByteBuffer、MappedByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer
其中 MappedByteBuffer 有些特殊。
分配一个 Buffer:在获取到一个 Buffer 对象之前,必须先进行分配。示例中分配了一个容量为 48 字节的 ByteBuffer 对象,和一个容量为 1024 字符的 CharBuffer 对象。
ByteBuffer buf = ByteBuffer.allocate(48);
CharBuffer buf = CharBuffer.allocate(1024);
将数据写入到 Buffer
-
将数据从 channel 写入到 buffer
-
自己将数据写入到 buffer,通过 buffer 的
put()
方法
示例:
int bytesRead = inChannel.read(buf);
buf.put(127 );
还可以通过其他重载的 put()
方法,实现写入到指定位置、写入字节数组等到 buffer 中
flip()
flip()
方法将 buffer 从写模式切换到读模式,会将 position 重置为0,将 limit 置为之前 position 所在的位置
从 Buffer 中读取数据
- 将书从 buffer 读取到 channel
- 自己从 buffer 中读取数据,使用
get()
方法中的一个
示例
int bytesWritten = inChannel.write(buf);
byte aByte = buf.get();
还可以通过其他重载的 get()
方法,实现读取指定位置的数据、读取字节数组等
rewind()
rewind()
方法将 position 置为 0,这样就可以重新从 buffer 中读取所有的数据。limit 数值不变。
clear() 和 compact()
clear()
方法会将 position 置为 0,limit 置为 capacity 的值。这样相当于清除了 buffer 中的数据,但是实际数据是没有清除的,仅仅是对标记进行了更新设置。
compact()
方法将所有未读数据复制到 buffer 的开始位置,然后将 position 设到最后一个未读元素的后面,limit 仍然是设置为 capacity,和 clear()
方法一样。这样就是从未读数据之后开始进行写入。
mark() 和 reset()
可以通过 Buffer.mark()
方法标记一个指定的 position。之后可以通过 Buffer.reset()
方法重置 position 到原来的位置。
equals() 和 compareTo()
可以通过 equals()
和 compareTo()
方法比较两个 buffer
相等的比较:类型相同,并且存放的数据(字节、字符等)的数量相同,并且所有存放的数据相同
大小的比较(小于的情况):第一个元素小于对方第一个元素,或者元素都相等但是元素少些
Scatter / Gather
提供向多个 channel 进行读取或者写入的支持
Scattering Reads
从单个 channel 读取数据导多个 buffer,示例:
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body};
channel.read(bufferArray);
先将 buffer 放到数组里,然后将数组其作为参数传递到 channel.read()
方法中,read()
方法将按数组中 buffer 的顺序从 channel 读取数据后写入到 buffer,一旦第一个 buffer 满了之后,channel 将移动到下一个 buffer 进行写入。
由于需要将一个 buffer 写满之后才会对下一个 buffer 进行写入,这样就不适合动态大小的消息体了。所以如果 header 的大小是固定的,那么 scattering 读才适用。
Gathering Writes
从多个 buffer 写数据到单个 channel,示例:
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = {header, body};
channel.writer(bufferArray);
buffer 数组传入到 write()
方法中,方法将会按照数组顺序将 buffer 中的内容写入到 channel,其中只有 buffer 中 position 到 limit 的数据会被写入。因此 gathering write 适用于动态大小的信息,与 scattering read 不相同。
Channel to Channel Transfers
NIO 允许直接将数据从一个 channel 传输到另一个 channel,如果其中一个 channel 是一个 FileChannel 的话。FileChannel 类有 transferTo()
和 transferFrom()
两个可供使用的方法。
#####transferFrom()
FileChannel.transferFrom()
方法将数据从一个源 channel 传输到一个 FileChannel
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChanel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
参数中 position 声明目标文件中其实位置,count 声明传输的字节的多少。源 channel 中少于该字节数量的数据也可以被传输。
此外,一些 SocketChannel 实现也许会仅仅传输 SocketChannel 中当时有的数据,即便 SocketChannel 会在后来有了更多的数据。因此,它也许不会传输 SocketChannel 中所有的请求数据到 FileChannel
transferTo()
transferTo()
方法从一个 FileChannel 传输数据导另一个 channel
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChanel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
fromChannel.transferTo(position, count, toChannel);
SocketChannel 中的问题同样会在 transferTo()
方法中出现,SocketChannel 的实现也许只会传输 FileChannel 中的数据直到第二个 buffer 已经满了,然后停止。
Selector
Selector 是一个可以检测一个或者多个 Java NIO Channel 实例的组件,它可以决定哪些 channel 是否能够读写。这样单个线程就可以管理多个 channel,或者多个网络连接。
创建 Selector
使用 Selector.open()
方法来创建一个 Selector
Selector selector = Selector.open();
注册 Channel 到 Selector
要使用 Selector 来管理 channel,必须要注册 channel 到 Selector。可以使用 SelectableChannel.register()
方法来实现。
channel.configureBlocking(false):
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
要使用 Selector,channel 必须处于非阻塞模式。这意味着 Selector 不可以使用 FileChannel,因为 FileChannel 无法切换到非阻塞模式。然而 SocketChannel 可以正常工作。
注意到 register()
方法的第二个参数,这是一个”兴趣集合”,代表希望监听到的 channel 的事件。可以监听的事件有:
Connect、Accept、Read、Write
channel 发起事件可以说是为该事件准备好了。channel 如果成功连接则为 connect 准备,服务器 socket channel 接受一个到来的连接是 accept 准备,channel 准备好数据被读取是 read 准备,channel 准备好被写入数据是 write 准备。
对应的四个 SelectionKey 常量:
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
如果要监听多个时间,可以这样写:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
SelectionKey
将 channel 注册到 Selector 的 register()
方法会返回一个 SelectionKey 对象,对象中包含以下属性:
interest set、read set、channel、selector、an attached object
Interest Set
是关注的事件的集合。可以对此集合进行如下读写操作:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
使用 AND 和给定的 SelectionKey 常量来判断该事件是否存在
Ready Set
是 channel 已经准备好的操作的集合。在选择后会首先访问到 ready set。可以这样访问 ready set
int readySet = selectionKey.readyOps();
可以和 interest key 一样测试,哪些事件或操作是 channel 已经准备好了的。但是也可以使用如下方法,返回布尔值:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel + Selector
Trivial
Attaching Objects
可以通过添加附件对象或其他消息的方式来方便的识别一个 channel,如添加 channel 对应的 buffer 等
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
还可以通过 register()
方法在注册 channel 到 selector 的时候来添加对象
SelectionKey key = channel.register(selector, SelectionKey.OP_HEAD, theObject);
Selecting Channel via a Selector
使用 select()
方法来选择一个已经注册到 selector 上的 channel,而该 channel 已经做好监听某个事件的准备
有三个方法:select()、select(long timeout)、selectNow(),其中第一个会阻塞到获取到 channel,第二个会阻塞到一个最大的时间,最后一个会立即返回一个任何准备好的 channel
方法返回的值是准备好的 channel 的个数。如果调用一次方法返回了1,表示有一个 channel 处于准备状态,再调用一次方法,还是返回1,这是另一个 channel 处于准备状态,如果没有在第一次调用的时候对 channel 进行操作,那么现在就有 2 个 channel 处于准备状态了,在每次调用 select()
方法之间仅有一个 channel 会处于准备好的状态。
selectedKeys()
在获取到准备好的 channel 的个数后,selectedKeys()
方法可以通过 “selected key set” 访问准备好的 channel。
Set<SelectionKey> selectedKyes = selector.selectedKeys();
在通过 Channel.register()
方法将 channel 注册到 selector 时,会返回一个 SelectionKey 对象,可以通过 selectedKeySet()
方法来访问这些 keys。
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
if(key.isAcceptable()){
// a connection was accepted by a ServerSocketChannel.
}
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
这里遍历所有的 key,判断每个key 引用的 channel 准备好了哪些事件。
这里在循环里删除了 key,这里必须这么做,不然下次这些 channel 还会添加到准备好的 “selected key set” 中。
wakeup()
Selector.wakeup()
方法可以将一个调用 select()
方法并被阻塞的线程收回,离开该方法。如果没有线程阻塞在 select()
方法中,那么下一个调用该方法的线程将会立即 “wake up”
close()
当使用完 selector 之后,调用 close()
方法。这会关闭 Selector 并且失效所有注册到 selector 上的 SelectorKey。但是 channel 没有被关闭。
Full Selector Example
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.selectNow();
if(readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}