介绍

  • Java AIO (Asynchronous IO)
    • jdk1.7(NIO2)才是真正的异步AIO
    • 把IO读写操作完全交给操作系统
    • 学习了linux的epoll模式

原理

    • 服务端:AsynchronousServerSocketChannel
    • 客户端:AsynchronousSocketChannel
    • 用户处理器:CompletionHandler接口
      • 实现应用程序向操作系统发起IO请求,当完成后处理具体逻辑,否则做自己该做的事情

异步IO需要操作系统更强的支持。在IO多路复用模型中,事件循环将文件句柄的状态通知给用户线程,由用户线程自行读取数据、处理数据。

而在异步IO模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在IO完成后通知用户线程直接使用即可。

异步IO模型使用了Proactor设计模式实现了这一机制。

NIO & AIO 对比

属性 同步阻塞IO(BIO) 伪异步IO 非阻塞IO(NIO) 异步IO(AIO)
客户端数:IO线程数 1:1 M:N(M>N) M:1 M:0
阻塞类型 阻塞 阻塞 非阻塞 非阻塞
同步 同步 同步 同步(多路复用) 异步
API使用难度 简单 简单 复杂 复杂
调试难度 简单 简单 复杂 复杂
可靠性 非常差
吞吐量

示例代码

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.bai;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AIOServer {

public final static int PORT = 8001;
public final static String IP = "127.0.0.1";
private AsynchronousServerSocketChannel server = null;

public AIOServer(){
try {
//同样是利用工厂方法产生一个通道,异步通道 AsynchronousServerSocketChannel
server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));
} catch (IOException e) {
e.printStackTrace();
}
}

//使用这个通道(server)来进行客户端的接收和处理
public void start(){
System.out.println("Server listen on "+PORT);
//注册事件和事件完成后的处理器,这个CompletionHandler就是事件完成后的处理器
server.accept(null,new CompletionHandler<AsynchronousSocketChannel,Object>(){
final ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public void completed(AsynchronousSocketChannel result,Object attachment) {
Future<Integer> writeResult = null;
try{
buffer.clear();
result.read(buffer).get(100,TimeUnit.SECONDS);
System.out.println("In server: "+ new String(buffer.array()));
//将数据写回客户端
buffer.flip();
writeResult = result.write(ByteBuffer.wrap("hi".getBytes()));
}catch(InterruptedException | ExecutionException | TimeoutException e){
e.printStackTrace();
}finally{
server.accept(null,this);
try {
System.out.println(writeResult.get());
result.close();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}

@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("failed:"+exc);
}
});
}

public static void main(String[] args) {
new AIOServer().start();
while(true){
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.bai;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOClient {

public static void main(String[] args) throws IOException {

final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 8001);

CompletionHandler<Void, ? super Object> handler = new CompletionHandler<Void, Object>() {

@Override
public void completed(Void result, Object attachment) {
final ByteBuffer bb = ByteBuffer.allocate(1024);
client.write(ByteBuffer.wrap("hello".getBytes()), null, new CompletionHandler<Integer, Object>() {

@Override
public void completed(Integer result, Object attachment) {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {

@Override
public void completed(Integer result, ByteBuffer attachment) {
buffer.flip();
System.out.println("In client: "+new String(buffer.array()));

}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
}

@Override
public void failed(Throwable exc, Object attachment) {
}
});
}

@Override
public void failed(Throwable exc, Object attachment) {
}
};

client.connect(serverAddress, null, handler);
while(true){
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}