概念

阻塞Block和非阻塞Non-Block

进程在访问数据的时候,数据是否准备就绪的一种处理方式。

  • 当数据没有准备的时候
    • 阻塞
      • 往往需要等待缓冲区中的数据准备好过后才处理其它的事情,否则一直等待
    • 非阻塞
      • 直接返回,不回等待。
IO模型 IO NIO
方式 从硬盘到内存 从内存到硬盘
通信 面向流(乡村公路) 面向缓冲区(高速公路,多路复用技术)
处理 阻塞IO(多线程) 非阻塞IO(反应堆Reactor)
触发 选择器(轮循机制)
    • 每次从流中读一个或多个字节,直至读取所有字节,没有被缓存在任何地方。
    • 不能前后移动流中的数据
      • 需要移动从流中读区的数据,需要先将它缓存到一个缓冲区
  • 缓冲
    • 数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动
      • 增加了处理过程中的灵活性
      • 需检查是否该缓冲区中包含所有需要处理的数据
      • 需确保当跟多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据

同步Synchronization和异步Asynchronous

基于应用程序和操作系统处理IO事件所采用的方式。

  • 同步
    • 应用程序直接参与IO读写的操作
    • 处理IO事件时,必须阻塞在某个方法上等待IO事件完成
    • 阻塞IO事件或者通过轮询IO事件的方式
    • 阻塞到IO事件,阻塞到read或者write,不能做其它事情
    • 读写方法加入到线程里面,通过阻塞线程来实现,对线程的性能开销比较大
  • 异步
    • 所有的IO读写交给操作系统去处理,应用程序需要等待通知时,可以去做其它事情

选择器Selector

Java NIO的选择器允许一个单独的线程来监视多个输入通道,可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道。

这些通道里已经有可以处理的输入或者选择已准备写入的通道。

这种选择机制,使得一个单独的线程很容易来管理多个通道。

NIO和IO如何影响应用程序的设计

  • 对NIO或IO类的API调用
  • 数据处理
  • 用来处理数据的线程数
    • NIO
      • 只使用一个或几个单线程管理多个通道(网络连接或文件)
      • 代价是解析数据可能会从一个阻塞流中读取更复杂
1
2
3
4
5
6
7
8
9
//流方式
FileInputStream input = new FileInputStream("d://info.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
//返回时数据已经读完
//仅在有新数据读入时才运行,并不知道每步的数据时什么
//一旦正在运行的线程一处理过读入的某些数据
//该线程不会再回退数据
//Stream<-Thread
String nameLine = reader.readLine();
1
2
3
4
//缓冲
//Channel--(fill data)->Buffer<--(check data)-Thread
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);

API调用

  • NIO
    • 数据先读入缓冲区再处理
  • IO
    • 从一个InputStream/Reader逐字节读取

Java NIO

  • Java 1.4
  • 面向Block

核心对象

  • 缓冲区Buffer
  • 通道Channel
  • 选择器Selector

缓冲区Buffer

缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组。NIO库中,所有数据都是用缓冲区处理的。从缓冲区读,写入到缓冲区。

所有的缓冲区类型都继承于抽象类Buffer。

  • Buffer
    • ByteBuffer
      • MappedByteBuffer
    • CharBuffer
    • DoubleBuffer
    • FloatBuffer
    • IntBuffer
    • LongBuffer
    • ShortBuffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.nio.IntBuffer;
public class TestIntBuffer {
public static void main(String[] args) {
// 分配新的 int 缓冲区,参数为缓冲区容量
// 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。它将具有一个底层实现数组, 其数组偏移量将为零。
IntBuffer buffer = IntBuffer.allocate(8);
for (int i = 0; i < buffer.capacity(); ++i) {
int j = 2 * (i + 1);
// 将给定整数写入此缓冲区的当前位置,当前位置递增
buffer.put(j);
}
// 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为 0
buffer.flip();
// 查看在当前位置和限制位置之间是否有元素
while (buffer.hasRemaining()) {
// 读取此缓冲区当前位置的整数,然后当前位置递增
int j = buffer.get();
// 2 4 6 8 10 12 14 16
System.out.print(j + " ");
}
}
}
深入剖析Buffer

一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果我们使用 get()方法从缓冲区获取数据,或者使用 put()方法把数据写入缓冲区,都会引起缓冲区状态的变化。

重要属性

  • position
    • 指定了下一个将要被写入或者读取的元素索引
    • 它的值由 get()/put()方法自动更新
    • 在新创建一个Buffer对象时,position被初始化为0。
  • limit
    • 指定还有多少数据需要取出(在从缓冲区写入通道时)
    • 或者还有多少空间可以放入数据(在从通道读入缓冲区时)
  • capacity
    • 指定了可以存储在缓冲区中的最大数据容量
    • 实际上,它指定了底层数据的大小
    • 或者至少是指定了准许使用的底层数组的容量

0 <=position<=limit<=capacity

都是在JVM允许范围内

例子:

  • 创建一个新的容量大小为10的ByteBuffer对象
    • 初始化时
      • position为0
      • limit和capacity被设置为10
    • 使用过程中
      • capacity不会发生变化
      • position和limit会变化
    • 读取4个数据
      • 从通道读取数据,相当于往缓冲区中写入数据
      • position为4
        • 下一个将要被写入的字节索引为4
      • limit依旧为10
    • 读取的数据写入到通道中,相当于从缓冲区中读取数据
      • 调用flip()
        • 把limit设置为当前的position值
          • 保证读取的数据正好是之前写入到缓冲区中的数据
        • 把position设置为0
          • 可以保证在下一步输出时读取到的是缓冲区的第一个字节
      • 调用get()
        • 从缓冲区中读取数据写入到输出通道
        • position增加
        • limit不变
        • position不会超过limit
      • 读取之前写入到缓冲区的4个字节之后
        • position和limit都为4
      • 从缓冲区读取数据完毕后
        • limit的值仍然保持在调用flip()方法时的值
    • 调用clear()能够把所有的状态变化设置为初始化时的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.io.FileInputStream;
import java.nio.*;
import java.nio.channels.*;
/**
* output:
初始化 :
capacity: 10, position: 0, limit: 10

调用 read() :
capacity: 10, position: 10, limit: 10

调用 flip() :
capacity: 10, position: 0, limit: 10

调用 get() :
capacity: 10, position: 10, limit: 10

调用 clear() :
capacity: 10, position: 0, limit: 10
*/
public class Test {

public static void main(String[] args) throws Exception {
FileInputStream fin = new FileInputStream("/Users/2bai/Downloads/test.txt");
FileChannel fc = fin.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(10);
output("初始化", buffer);
fc.read(buffer);
output("调用 read()", buffer);
buffer.flip();
output("调用 flip()", buffer);
while (buffer.remaining() > 0) {
byte b = buffer.get();
// System.out.print(((char)b));
}
output("调用 get()", buffer);

buffer.clear();
output("调用 clear()", buffer);
fin.close();
}

public static void output(String step, Buffer buffer) {
System.out.println(step + " : ");
System.out.print("capacity: " + buffer.capacity() + ", ");
System.out.print("position: " + buffer.position() + ", ");
System.out.println("limit: " + buffer.limit());
System.out.println();
}
}
缓冲区的分配

在创建一个缓冲区对象时,会调用静态方法allocate()来指定缓冲区的容量。其实调用allocate()相当于创建了一个指定大小的数组,并把它包装为缓冲区对象。也可以直接将一个的数组,包装为缓冲区对象。

1
2
3
4
5
6
7
8
9
10
import java.nio.ByteBuffer;
public class BufferWrap {
public void myMethod() {
// 分配指定大小的缓冲区
ByteBuffer buffer1 = ByteBuffer.allocate(10);
// 包装一个现有的数组
byte array[] = new byte[10];
ByteBuffer buffer2 = ByteBuffer.wrap( array );
}
}
缓冲区分片

在NIO中,除了可以分配或者包装一个缓冲区对象外,还可以根据现有的缓冲区对象来创建一个子缓冲区,即在现有缓冲区上切出一片来作为一个新的缓冲区。

现有的缓冲区与创建的子缓冲区在底层数组层面上是数据共享的。

子缓冲区相当于是现有缓冲区的一个视图窗口。

调用slice()可以创建一个子缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
0
1
2
30
40
50
60
7
8
9
*/
public static void main(String args[]) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(10);
// 缓冲区中的数据 0-9
for (int i = 0; i < buffer.capacity(); ++i) {
buffer.put((byte) i);
}
// 创建子缓冲区
buffer.position(3);
buffer.limit(7);
ByteBuffer slice = buffer.slice();
// 改变子缓冲区的内容
for (int i = 0; i < slice.capacity(); ++i) {
byte b = slice.get(i);
b *= 10;
slice.put(i, b);
}
buffer.position(0);
buffer.limit(buffer.capacity());
while (buffer.remaining() > 0) {
System.out.println(buffer.get());
}
}
只读缓冲区

只能读取,不能写入。

通过调用asReadOnlyBuffer(),将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过是只读的。

如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化。

常规缓冲区可以转换为只读缓冲区,
只读缓冲区不能转换为常规缓冲区。

常用于控制权限(回调方法),比如:在将缓冲区的引用传递给某个对象的方法时,你无法确定这个方法是否会修改缓冲区中的数据,这个时候,就可以创建一个只读缓冲区,然后把只读缓冲区的引用传递给那个方法,这样就能保证,使用那个方法的人没办法去修改缓冲区中的数据了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String args[]) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(10);
// 缓冲区中的数据 0-9
for (int i = 0; i < buffer.capacity(); ++i) {
buffer.put((byte) i);
}
// 创建只读缓冲区
ByteBuffer readonly = buffer.asReadOnlyBuffer();
// 改变原缓冲区的内容
for (int i = 0; i < buffer.capacity(); ++i) {
byte b = buffer.get(i);
b *= 10;
buffer.put(i, b);
}
readonly.position(0);
readonly.limit(buffer.capacity());
// 只读缓冲区的内容也随之改变
while (readonly.remaining() > 0) {
System.out.println(readonly.get());
}
}

通道Channel

  • 通道是一个对象,可以读取和写入数据。
  • 所有数据都通过Buffer对象来处理,不会将字节直接写入通道或直接从通道中读取数据。
  • 所有的通道对象都实现了Channel接口。
使用NIO读取数据
  • 从FileInputStream获取Channel
  • 创建Buffer
  • 将数据从Channel读取到Buffer中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String args[]) {
FileInputStream fin = null;
try {
fin = new FileInputStream("/Users/2bai/Downloads/test.txt");
// 获取通道
FileChannel fc = fin.getChannel(); // 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取数据到缓冲区
fc.read(buffer);
buffer.flip();
while (buffer.remaining() > 0) {
byte b = buffer.get();
System.out.print(((char) b));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
使用NIO写入数据
  • 从FileInputStream获取Channel
  • 创建Buffer
  • 将数据从Channel写入到Buffer中
1
2
3
4
5
6
7
8
9
public static void main(String args[]) throws Exception {
FileOutputStream fout = new FileOutputStream("/Users/2bai/Downloads/test.txt");
FileChannel fc = fout.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(Charset.forName("utf8").encode("你好你好你好你好你好"));
buffer.flip();
fc.write(buffer);
fout.close();
}

反应堆Reactor

阻塞I/O通信模型

缺点

  • 当客户端多时,会创建大量的处理线程,且每个线程都要占用栈空间和CPU时间
  • 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。在这种情况下非阻塞I/O就又了它的应用前景
Java NIO原理及通信模型
  • 由一个专门的线程来处理所有的IO事件,并负责分发
  • 事件驱动机制
    • 事件到的时候触发,而不是同步的去监视事件
  • 线程通讯
    • 线程之间通过wait、notify等方式通讯
    • 保证每次上下文切换都是有意义的,减少无谓的线程切换

客户端—>Reactor(分发wait/notify)—>处理线程N(read、decode、compute、encode、send)

每个线程的处理流程大概都是读取数据、解码、计算处理、编码、发送相应。

选择器Seletor

传统的Server/Client模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独处理一个客户请求。

问题:线程数量的剧增,大量线程会增大服务器开销。

解决:线程池。

产生问题:线程池中的线程都在处理,超过线程池数量的请求无法处理。

NIO中非阻塞I/O采用了基于Reactor模式的工作方式,I/O调用不会被阻塞,可以注册感兴趣的特定I/O事件,如可读数据到达,新的Socket连接等等,在发生特定事件时,系统再通知。

NIO中实现非阻塞I/O的核心对象就是Selector,Selector就是注册各种I/O事件的地方。而且当事件发生时,就是Selector告诉所发生的事件。

当有读或写等任何注册的事件发生时,可以从Selector中获得相应的SelectorKey,同事从SelectorKey中可以找到发生的事件和该事件所发生的具体SelectableChannel,以获得客户端发送过来的数据。

Select是NIO的核心,底层使用epoll模型。

  • 使用NIO中非阻塞I/O编写服务器处理程序
    • 向Selector对象注册感兴趣的事件
    • 从Selector中获得去感兴趣的事件
    • 根据不同的事件进行相应的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/*
关键点:
1.创建一个ServerSocketChannel,和一个Selector,并且把这个server channel 注册到 selector上,注册的时间指定,这个channel 所感觉兴趣的事件是 SelectionKey.OP_ACCEPT,这个事件代表的是有客户端发起TCP连接请求。
2.使用 select 方法阻塞住线程,当select 返回的时候,线程被唤醒。再通过selectedKeys方法得到所有可用channel的集合。
3.遍历这个集合,如果其中channel 上有连接到达,就接受新的连接,然后把这个新的连接也注册到selector中去。
4.如果有channel是读,那就把数据读出来,并且把它感兴趣的事件改成写。如果是写,就把数据写出去,并且把感兴趣的事件改成读。
*/

/*
* 注册事件
*/
private Selector getSelector() throws IOException {
// 创建 Selector 对象
Selector sel = Selector.open();
// 创建可选择通道
ServerSocketChannel server = ServerSocketChannel.open();
// 配置为非阻塞模式
server.configureBlocking(false);
// 绑定通道到指定端口
ServerSocket socket = server.socket();
InetSocketAddress address = new InetSocketAddress(port);
socket.bind(address);
// 向 Selector 中注册感兴趣的事件(监听accept事件)
// 新的连接发生时所产生的事件
// OP option
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}

/*
* 开始监听
*/
public void listen() {
System.out.println("listen on " + port);
// 非阻塞I/O中,内部循环模式基本都是遵循这种方式
try {
while (true) {
// 该调用会阻塞,直到至少有一个事件发生
selector.select();
// 获取发生事件的SelectionKey
Set<SelectionKey> keys = selector.selectedKeys();
// 迭代器循环
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
process(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

/*
* 根据不同的事件做处理
*/
private void process(SelectionKey key) throws IOException {
// 接收请求
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
// 读信息
else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
int len = channel.read(buffer);
if (len > 0) {
buffer.flip();
String name = new String(buffer.array(), 0, len);
SelectionKey sKey = channel.register(selector, SelectionKey.OP_WRITE);
sKey.attach(name);
} else {
channel.close();
}
buffer.clear();
}
// 写事件
else if (key.isWritable()) {
SocketChannel channel = (SocketChannel) key.channel();
String name = (String) key.attachment();
ByteBuffer block = ByteBuffer.wrap(("Hello " + name).getBytes());
if (block != null) {
channel.write(block);
} else {
channel.close();
}
}
}

FileChannel

ServerSocketchannel

SocketChannel

ByteBuffer

MappedByteBuffer

方法

wrap

slice

allocate

allocateDirect

asReadOnlyBuffer

动态改变position的值(油表)

put

get

源码分析

Selector.open()

java.nio.channels.Selector#open

1
2
3
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

Selector是调用的SelectorProvider.provider()拿到的provider对象创建的ServerSocketChannel,上文的SelectorPipe都是通过这一个provider对象创建的。

java.nio.channels.spi.SelectorProvider#provider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static SelectorProvider provider() {
synchronized (lock) {
//保证了整个server程序中只有一个provider对象
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
//会根据操作系统来返回不同的实现类
//windows->WindowsSelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

对于windows操作系统来说他的实现类就是WindowsSelectorProvider,而WindowsSelectorProvider类中只是实现了openSelector方法。

WindowsSelectorProvider#openSelector

1
2
3
4
5
6
7
8
public AbstractSelector openSelector() throws IOException{
//和操作系统有关系
//调用了操作系统底层的API
// Windows -> WindowsSelectorImpl
// Mac -> KQueueSelectorImpl
// Linux -> PollSelectorImpl
return new WindowsSelectorImpl(this);
}

WindowsSelectorImpl(SelectorProvider)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
WindowsSelectorImpl(SelectorProvider sp) throws IOException { 
super(sp);
//pollWrapper保存selector上注册的FD
//包括pipe的write端FD和ServerSocketChannel所用的FD
pollWrapper = new PollArrayWrapper(INIT_CAP);
//wakeupPipi:通道
//其实就是两个FD:一个read、一个write
//关键点
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

java.nio.channels.Pipe#open

1
2
3
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}

sun.nio.ch.SelectorProviderImpl#openPipe

1
2
3
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}

sun.nio.ch.PipeImpl

1
2
3
4
5
6
7
8
9
10
11
PipeImpl(SelectorProvider sp) {
int[] fdes = new int[2];
//关键点
IOUtil.initPipe(fdes, true);
FileDescriptor sourcefd = new FileDescriptor();
IOUtil.setfdVal(sourcefd, fdes[0]);
source = new SourceChannelImpl(sp, sourcefd);
FileDescriptor sinkfd = new FileDescriptor();
IOUtil.setfdVal(sinkfd, fdes[1]);
sink = new SinkChannelImpl(sp, sinkfd);
}

sun.nio.ch.IOUtil#initPipe

1
static native void initPipe(int[] fda, boolean blocking);

solaris/native/sun/nio/ch/IOUtil.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_initPipe(JNIEnv *env, jobject this,
jintArray intArray, jboolean block)
{
int fd[2];
jint *ptr = 0;

if (pipe(fd) < 0) {
JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
return;
}
if (block == JNI_FALSE) {
if ((configureBlocking(fd[0], JNI_FALSE) < 0)
|| (configureBlocking(fd[1], JNI_FALSE) < 0)) {
JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
}
}
ptr = (*env)->GetPrimitiveArrayCritical(env, intArray, 0);
ptr[0] = fd[0];
ptr[1] = fd[1];
(*env)->ReleasePrimitiveArrayCritical(env, intArray, ptr, 0);
}

ServerSocketChannel.open()

java.nio.channels.ServerSocketChannel#open

1
2
3
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}

openServerSocketChannel方法也和openPipe方法一样由父类SelectorProviderImpl来实现的

sun.nio.ch.SelectorProviderImpl#openServerSocketChannel

1
2
3
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}

sun.nio.ch.ServerSocketChannelImpl

1
2
3
4
5
6
7
8
9
10
11
12
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
//调用父类的构造方法将SelectorProvider这个对象
//初始化到父类的父类AbstractSelectableChannel中的
super(sp);
//创建了一个文件描述符对象设置为成员属性,注意这里传入的参数是true
//代表创建的ServerSocketChannel对应的文件描述符对象是阻塞的
this.fd = Net.serverSocket(true);
//设置fdVal值,保存的是内核中文件当前文件描述符对象的index
this.fdVal = IOUtil.fdVal(fd);
//设置一个ServerSocketChannel的状态
this.state = ST_INUSE;
}

server.configureBlocking(false);

java.nio.channels.spi.AbstractSelectableChannel#configureBlocking

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final SelectableChannel configureBlocking(boolean block) throws IOException{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if (blocking == block)
return this;
if (block && haveValidKeys())
throw new IllegalBlockingModeException();
//关键点
implConfigureBlocking(block);
blocking = block;
}
return this;
}

sun.nio.ch.ServerSocketChannelImpl#implConfigureBlocking

1
2
3
4
protected void implConfigureBlocking(boolean block) throws IOException {
//将当前ServerSocketChannel设置为非阻塞的模式
IOUtil.configureBlocking(fd, block);
}

server.socket();

sun.nio.ch.ServerSocketChannelImpl#socket

1
2
3
4
5
6
7
8
9
public ServerSocket socket() {
synchronized (stateLock) {
//ServerSocketImpl中并没有看到创建ServerSocket对象的语句
//所以if (socket == null)的判断肯定是返回的true
if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
}
}

sun.nio.ch.ServerSocketAdaptor#create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static ServerSocket create(ServerSocketChannelImpl ssc) {
try {
return new ServerSocketAdaptor(ssc);
} catch (IOException x) {
throw new Error(x);
}
}
//ServerSocketAdaptor类的构造方法如下
//创建的ServerSocketAdaptor中保存了ServerSocketChannelImpl的引用
//ServerSocketAdaptor类是ServerSocket的子类
//它覆盖了父类中的bind,accept,close等方法
private ServerSocketAdaptor(ServerSocketChannelImpl ssc) throws IOException{
this.ssc = ssc;
}

socket.bind

sun.nio.ch.ServerSocketAdaptor#bind

1
2
3
4
5
6
7
8
9
10
11
12
13
public void bind(SocketAddress local) throws IOException {
bind(local, 50);
}

public void bind(SocketAddress local, int backlog) throws IOException {
if (local == null)
local = new InetSocketAddress(0);
try {
ssc.bind(local, backlog);
} catch (Exception x) {
Net.translateException(x);
}
}

sun.nio.ch.ServerSocketChannelImpl#bind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (lock) {
if (!isOpen())
throw new ClosedChannelException();
if (isBound())
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
//关键点
//底层调用的是native方法
Net.bind(fd, isa.getAddress(), isa.getPort());
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
localAddress = Net.localAddress(fd);
}
}
return this;
}

server.register

往Selector注册Channel,通道触发了一个事件意味着该事件已经就绪

java.nio.channels.SelectableChannel#register

  • 可以注册四种事件,java.nio.channels.SelectionKey内定义:
    • OP_READ(1)
      • 一个有数据可读的通道
    • OP_WRITE(4)
      • 等待写数据的通道
    • OP_CONNECT(8)
      • 某个channel成功连接到另一个服务器
    • OP_ACCEPT(16)
      • 一个server socket channel准备好接收新进入的连接
  • 注册多个事件,用“位或|”
    • int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
1
2
3
4
5
6
7
8
9
//SelectionKey包含:
//interest集合
//ready集合
//Channel
//Selector
//附加的对象(可选)
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException{
return register(sel, ops, null);
}

java.nio.channels.spi.AbstractSelectableChannel#register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// SelectionKey保存注册时的channel、selector、event
// 以及保存在pollWrapper的偏移位置index
public final SelectionKey register(Selector sel, int ops,Object att) throws ClosedChannelException {
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
// 1 如果该channel和selector已经注册过,则直接添加感兴趣的事件和附件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
// 2 否则通过selector实现注册过程,调用select的regist
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
//关键点
//调用select的regist
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}

sun.nio.ch.SelectorImpl#register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment){
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
//1.创建了一个SelectionKeyImpl对象
//注册感兴趣事件和附件
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
//调用attach方法将传入的附件类attachment放入到SelectionKeyImpl的属性中
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
//调用SelectionKeyImpl的interestOps方法将感兴趣的操作(SelectionKey.OP_ACCEPT)传入
//实际上这个interestOps方法也是将这个值放到了KQueueSelectorImpl的pollWrapper属性中
k.interestOps(ops);
return k;
}
implRegister

sun.nio.ch.SelectionKeyImpl

1
2
3
4
5
6
7
8
9
final SelChImpl channel;                            // package-private
public final SelectorImpl selector;
//保存了Selector的实现类和ServerSocketChannel的实现类
//这个构造方法中接受的第一个参数是SelChImpl类
//那么ServerSocketChannel的实现类肯定实现了这个类
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}

接着调用implRegister,这个方法的实现就是Selector的实现类,系统不同,实现类不同。

PollSelectorImpl(Linux),WindowsSelectorImpl(Windows),KQueueSelectorImpl(Mac)

sun.nio.ch.AbstractPollSelectorImpl#implRegister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();
// Check to see if the array is large enough
//如果totalChannels的channel数量达到了channelArray的长度(默认是8)
//查看pollWrapper中的pollfd数组是否足够大
if (channelArray.length == totalChannels) {
// Make a larger array
//那么channelArray 急需要扩充,扩充为2倍
int newSize = pollWrapper.totalChannels * 2;
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
// Copy over
for (int i=channelOffset; i<totalChannels; i++)
temp[i] = channelArray[i];
channelArray = temp;
// Grow the NativeObject poll array
//同时保存文件描述符对象的PollArrayWrapper类也需要扩充
pollWrapper.grow(newSize);
}
//2.把新建的SelectionKey添加到pollWrapper的channel数组
channelArray[totalChannels] = ski;
//设置SelectionKeyImpl对象的index属性也就是他在channelArray数组中的位置
ski.setIndex(totalChannels);
//将SelectionKeyImpl对象的channel放到pollWrapper中
pollWrapper.addEntry(ski.channel);
//totalChannels自增
totalChannels++;
//将SelectionKeyImpl对象放到keys属性中
keys.add(ski);
}
}
interestOps

sun.nio.ch.SelectionKeyImpl#interestOps

1
2
3
4
public SelectionKey interestOps(int ops) {
ensureValid();
return nioInterestOps(ops);
}

sun.nio.ch.SelectionKeyImpl#nioInterestOps

1
2
3
4
5
6
7
public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}

SelectorServerSocketChannel都创建好了,通过ServerSocketChannelregister方法把二者绑定在一起,也就是把创建ServerSocketChannel时创建的FD(file descriptor)与Selector绑定在了一起。

至此,server端已经启动完成了。

selector.select()

监听哪些Channel已经就绪

selectNow的选择过程是非阻塞的,select(timeout)和select()的选择过程是阻塞的

sun.nio.ch.SelectorImpl#select

1
2
3
4
5
6
7
8
9
10
11
12
//返回值表示有多少通道已经就绪
//select()底层调用为select(0)
public int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
//关键点
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}

public int select() throws IOException {
return select(0);
}

sun.nio.ch.SelectorImpl#lockAndDoSelect

1
2
3
4
5
6
7
8
9
10
11
12
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
//关键点
return doSelect(timeout);
}
}
}
}

sun.nio.ch.PollSelectorImpl#doSelect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected int doSelect(long timeout)
throws IOException
{
if (channelArray == null)
throw new ClosedSelectorException();
//处理已经不监听的事件(文件描述符或Channel)
processDeregisterQueue();
try {
//标志开始一个可能会被中断的IO操作
begin();
//核心
//底层调用native方法poll0
//本质上是调用了系统的epoll方法
//调用native方法epoll获取已经就绪的pollfd
//totalChannels:
//The number of valid channels in this Selector's poll array
pollWrapper.poll(totalChannels, 0, timeout);
} finally {
end();
}
//处理已经不监听的事件(文件描述符或Channel)
processDeregisterQueue();
//2 获取就绪的Key的数目,并且将就绪的Key赋值给selector的selectedKey
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.getReventOps(0) != 0) {
// Clear the wakeup pipe
//清除wakeup通道
pollWrapper.putReventOps(0, 0);
synchronized (interruptLock) {
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}

sun.nio.ch.PollArrayWrapper#poll

1
2
3
4
5
6
7
int poll(int numfds, int offset, long timeout) {
//SIZE_POLLFD:8
return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
numfds, timeout);
}
//这一方法是一个native方法,本质上是调用了系统的epoll方法
private native int poll0(long pollAddress, int numfds, long timeout);

如果有已经就绪的pollfd,poll0()就会返回,没有则一直阻塞,也就是selector.select()会一直阻塞。poll0()返回后,selector.select()就会返回,所以在监听时要用while(true),这样就可以保证在selector接收到数据并处理完后继续监听poll()。

其它

selector.wakeUp()

个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

wakeup()是通过pipe发送一个字节来唤醒poll().

selector.close()

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

查找native方法源码

一般,从JDK目录下的native目录(可能跟OS平台相关)根据包名路径去找相关的Class名.c文件即可找到对应的native方法。

如果间接调用了hotspot的实现(jvm会以动态库的形式被加载,prims定义了hotspot与其它模块的接口及实现),那么从share/vm/prims/jvm.cpp文件可找到JVM_XXX函数的实现。

NIO & AIO 对比

属性 同步阻塞IO(BIO) 伪异步IO 非阻塞IO(NIO) 异步IO(AIO)
客户端数:IO线程数 1:1 M:N(M>N) M:1 M:0
阻塞类型 阻塞 阻塞 非阻塞 非阻塞
同步 同步 同步 同步(多路复用) 异步
API使用难度 简单 简单 复杂 复杂
调试难度 简单 简单 复杂 复杂
可靠性 非常差
吞吐量

参考资料

grepcode-openjdk

hg.openjdk.java.net

eclipse插件

Java Source Attacher