IO 与 NIO(三)- AsynchronousFileChannel、DatagramChannel、Pipe、IO 与 NIO 区别、NIO path & file

5 分钟读完

[TOC]

Java 的 IO 与 NIO - AsynchronousFileChannel、DatagramChannel、Pipe、IO 与 NIO 区别、NIO path & file

AsynchronousFileChannel

AsynchronousFileChannel 允许异步的读取或者写入数据

Creating an AsynchronousFileChannel

通过静态方法 open() 来创建 AsynchronousFileChannel

Path path = Paths.get("data/test.xml");

AsynchronousFileChannel fileChannel =
    AsynchronousFileChannel.open(path, StandardOpenOption.READ);

Reading Data

有两种从 AsynchronousFileChannel 中读取数据的方法,都是调用 read() 方法

Reading Data Via a Future

第一种是调用 read() 方法,返回一个 Feture 对象

Future<Integer> operation = fileChannle.read(buffer, 0);

read() 方法会立即返回,即使读取操作还没有完成。可以通过调用 Future 中的 isDone() 方法来检查操作是否完成

AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

Future<Integer> operation = fileChannel.read(buffer, position);

while(!operation.isDone());

buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();

Reading Data Via a CompletionHandler

第二种是调用带 CompletionHandler 参数的 read() 方法。

fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("result = " + result);

        attachment.flip();
        byte[] data = new byte[attachment.limit()];
        attachment.get(data);
        System.out.println(new String(data));
        attachment.clear();
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }
});

一旦读取操作结束,CompletionHandler 的 completed() 方法会被调用,其第一个参数是读取的自己数量,第二个参数是读取的 channel;如果读取失败,failed() 方法会被调用。

Writing Data

用两种写入数据的方法,都是调用 write() 方法

Writing Data Via a Future

第一种方法同样是通过 Future 来实现异步。

Path path = Paths.get("data/test-write.txt");
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();

while(!operation.isDone());

System.out.println("Write done");

需要注意的是,在代码执行之前文件必须存在,否则会报错。可以添加如下代码保证文件存在

if(!Files.exists(path)){
    Files.createFile(path);
}

Writing Data Via a CompletionHandler

可以同样通过 CompletionHandler 来替代 Future 通知写入操作已经完成。

Path path = Paths.get("data/test-write.txt");
if(!Files.exists(path)){
    Files.createFile(path);
}
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("bytes written: " + result);
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("Write failed");
        exc.printStackTrace();
    }
});

写入操作完成后,completed() 方法会被调用,写入失败则调用 failed() 方法。注意 ByteBuffer 是作为附件传输的,在 CompletionHandler 方法中被传递

DatagramChannel

DatagramChannel 可以用于发送和接收 UDP 包。

Opening a DatagramChannel

DatagramChannel channel = DatagramChannle.open();
channel.socket().bind(new InetSocketAddress(9999));

Receiving Data

调用 receive() 方法

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();

channel.receive(buf);

如果接收的包数据大于 buffer 可以容纳的大小,其他数据则会被丢弃

Sending Data

String newData = "New String to write to file system.currentTimeMillis()";

ByteBuffer buff = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getByte());
buf.flip();

int bytesSent = channel.send(buf, new InetSocketAddress("baidu.com", 80));

通过 UDP 协议往 “baidu.com” 的 80 端口发送数据。

Connecting to a Specific Address

UDP 是无连接协议,可以通过锁定 DatagramChannel 来发送或读取指定地址的数据包,实现”连接”

channel.connect(new InetSocketAddress("baidu.com", 80));

这时可以使用 read()write() 方法,但是不保证数据能正确传输

int bytesRead = channel.read(buf);

int bytesWrite = channe.write(buf);

Pipe

Pipe 是两个线程之间的单向连接,有 sink channel 和 source channel,数据写入到 sink channel,然后这些数据可以从 source channel 读取

Creating a Pipe

Pipe pipe = Pipe.open();

Writing to a Pipe

要写入到 Pipe,需要访问 sink channel

Pipe.SinkChannle sinkChannel = pipe.sink()

再调用 write() 方法写入到 SinkChannel

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}

Reading from a Pipe

要从 Pipe 中读取,需要访问 source channel

Pipe.SourceChannel sourceChannel = pipe.source();

再调用 read() 方法从 source channel 中读取

ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buf);

IO 与 NIO 区别

Main Differences Betwen Java NIO and IO

IO NIO
Stream oriented Buffer oriented
Blocking IO Non blocking IO
  Selectors

Stream Oriented vs. Buffer Oriented

阻塞 IO 面向 stream,可以一次性读取一个或多个字节,但是数据没有缓存,无法在 stream 中对数据进行前移后移后操作

NIO 面向 buffer,可以前后移动,处理起来更灵活,但是需要检查 buffer 包含了要处理的所有信息,而且读取更多数据到 buffer 中的时候,要保证没有覆盖掉之前未处理的数据

Blocking vs. Non-blocking IO

阻塞 IO,线程在调用读写方法的时候是阻塞的,在处理完之前无法做其他的事

NIO 模式启动一个线程来请求从 channel 读取数据,而且仅会获取到当前能获取的,不会等待,线程就可以继续做其他的事情。写数据也是一样

Selectors

NIO Selector 允许单个线程监控多个输入的 channel。将 channel 注册到 Selector,然后使用单个线程来”选择”可以进行数据处理的 channels,或者准备好可以写入的。

How NIO and IO Influences Application Design

会影响到这些方面

  1. API 调用 NIO 和 IO 类
  2. 处理数据
  3. 处理数据的线程数量
The API Calls

IO 是从 InputStream 中一个字节一个字节的读取,NIO 是将数据读取到一个 buffer,然后从 buffer 开始进行处理

The Processing of Data

IO 使用 InputStream 会这么处理数据

InputStream input = ... ; // get the InputStream from the client socket

BufferedReader reader = new BufferedReader(new InputStreamReader(input));

String nameLine   = reader.readLine();
String ageLine    = reader.readLine();
String emailLine  = reader.readLine();
String phoneLine  = reader.readLine();

数据会被一行一行的处理,处理完之后不能再回去处理该行数据。

NIO 的实现则不同

ByteBuffer buffer = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buffer);

方法执行完成后,不会知道是否所有的数据都是在 buffer 中的。

为了读取所有的数据,需要多次检查数据的是否全都被读取,例如

ByteBuffer buffer = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buffer);

while(! bufferFull(bytesRead) ) {
    bytesRead = inChannel.read(buffer);
}

Summary

NIO 允许使用单个或很少的线程来管理多个 channel,开销更小,但是解析数据的话会更复杂

如果需要同时管理成千上万个打开的连接,每个连接仅会有一点数据,NIO 会是优势。如果需要保持很多已经打开的到其他计算机的连接,例如 P2P 网络,NIO 也会是优势。

如果是少量连接但是占用很多带宽,一次发送大量的数据,经典的 IO 服务也许会更合适

Java NIO Path

位于 java.nio.file 包下,java.nio.file.Path 与 java.io.File 类的接口相似,有一些小差异。大多数情况下可以使用 Path 接口替换 File 类。

Creating a Path Instance

import java.nio.file.Path;
import java.nio.file.Paths;

public class PathExample {

    public static void main(String[] args) {

        Path path = Paths.get("c:\\data\\myfile.txt");

    }
}

使用工厂方法 Paths.get() 方法来创建 Path 实例

Creating an Absolute Path

Path path = Paths.get("c:\\data\\myfile.txt"); //on windows

Path path = Paths.get("/home/jakobjenkov/myfile.txt"); //on unix

Creating a Relative Path

Path projects = Paths.get("d:\\data", "projects");

Path file     = Paths.get("d:\\data", "projects\\a-project\\myfile.txt");

文件的实际路径是:d:\data\projects\a-project\myfile.txt

. 编码表明是在当前目录下, .. 表明是在上级目录下

Path currentDir = Paths.get(".");
System.out.println(currentDir.toAbsolutePath());

Path parentDir = Paths.get("..");

String path = "d:\\data\\projects\\a-project\\..\\another-project";
Path parentDir2 = Paths.get(path);

Path.normalize()

normalize() 方法移除所有路径中的 . 和 .. 来是路径正常化

String originalPath =
        "d:\\data\\projects\\a-project\\..\\another-project";

Path path1 = Paths.get(originalPath);
System.out.println("path1 = " + path1);

Path path2 = path1.normalize();
System.out.println("path2 = " + path2);

Java NIO Files

java.nio.file.Files 提供了几个操作文件的方法。和 java.nio.file.Path 实例一起使用

Files.exists()

Path path = Paths.get("data/logging.properties");

boolean pathExists =
        Files.exists(path,
            new LinkOption[]{ LinkOption.NOFOLLOW_LINKS});

Files.createDirectory()

Path path = Paths.get("data/subdir");

try {
    Path newDir = Files.createDirectory(path);
} catch(FileAlreadyExistsException e){
    // the directory already exists.
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

Files.copy()

Path sourcePath      = Paths.get("data/logging.properties");
Path destinationPath = Paths.get("data/logging-copy.properties");

try {
    Files.copy(sourcePath, destinationPath);
} catch(FileAlreadyExistsException e) {
    //destination file already exists
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

Overwriting Existing Files

Path sourcePath      = Paths.get("data/logging.properties");
Path destinationPath = Paths.get("data/logging-copy.properties");

try {
    Files.copy(sourcePath, destinationPath,
            StandardCopyOption.REPLACE_EXISTING);
} catch(FileAlreadyExistsException e) {
    //destination file already exists
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

Files.move()

Path sourcePath      = Paths.get("data/logging-copy.properties");
Path destinationPath = Paths.get("data/subdir/logging-moved.properties");

try {
    Files.move(sourcePath, destinationPath,
            StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
    //moving file failed.
    e.printStackTrace();
}

Files.delete()

Path path = Paths.get("data/subdir/logging-moved.properties");

try {
    Files.delete(path);
} catch (IOException e) {
    //deleting file failed
    e.printStackTrace();
}

Files.walkFileTree()

public interface FileVisitor {

    public FileVisitResult preVisitDirectory(
        Path dir, BasicFileAttributes attrs) throws IOException;

    public FileVisitResult visitFile(
        Path file, BasicFileAttributes attrs) throws IOException;

    public FileVisitResult visitFileFailed(
        Path file, IOException exc) throws IOException;

    public FileVisitResult postVisitDirectory(
        Path dir, IOException exc) throws IOException {

}
Files.walkFileTree(path, new FileVisitor<Path>() {
  @Override
  public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
    System.out.println("pre visit dir:" + dir);
    return FileVisitResult.CONTINUE;
  }

  @Override
  public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    System.out.println("visit file: " + file);
    return FileVisitResult.CONTINUE;
  }

  @Override
  public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
    System.out.println("visit file failed: " + file);
    return FileVisitResult.CONTINUE;
  }

  @Override
  public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
    System.out.println("post visit directory: " + dir);
    return FileVisitResult.CONTINUE;
  }
});

Searching For Files

Path rootPath = Paths.get("data");
String fileToFind = File.separator + "README.txt";

try {
  Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>() {
    
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
      String fileString = file.toAbsolutePath().toString();
      //System.out.println("pathString = " + fileString);

      if(fileString.endsWith(fileToFind)){
        System.out.println("file found at path: " + file.toAbsolutePath());
        return FileVisitResult.TERMINATE;
      }
      return FileVisitResult.CONTINUE;
    }
  });
} catch(IOException e){
    e.printStackTrace();
}

Deleting Directories Recursively

Path rootPath = Paths.get("data/to-delete");

try {
  Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>() {
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
      System.out.println("delete file: " + file.toString());
      Files.delete(file);
      return FileVisitResult.CONTINUE;
    }

    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
      Files.delete(dir);
      System.out.println("delete dir: " + dir.toString());
      return FileVisitResult.CONTINUE;
    }
  });
} catch(IOException e){
  e.printStackTrace();
}