概念
阻塞Block和非阻塞Non-Block
进程在访问数据的时候,数据是否准备就绪的一种处理方式。
- 当数据没有准备的时候
- 阻塞
- 往往需要等待缓冲区中的数据准备好过后才处理其它的事情,否则一直等待
- 非阻塞
IO模型 |
IO |
NIO |
方式 |
从硬盘到内存 |
从内存到硬盘 |
通信 |
面向流(乡村公路) |
面向缓冲区(高速公路,多路复用技术) |
处理 |
阻塞IO(多线程) |
非阻塞IO(反应堆Reactor) |
触发 |
无 |
选择器(轮循机制) |
- 流
- 每次从流中读一个或多个字节,直至读取所有字节,没有被缓存在任何地方。
- 不能前后移动流中的数据
- 需要移动从流中读区的数据,需要先将它缓存到一个缓冲区
- 缓冲
- 数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动
- 增加了处理过程中的灵活性
- 需检查是否该缓冲区中包含所有需要处理的数据
- 需确保当跟多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据
同步Synchronization和异步Asynchronous
基于应用程序和操作系统处理IO事件所采用的方式。
- 同步
- 应用程序直接参与IO读写的操作
- 处理IO事件时,必须阻塞在某个方法上等待IO事件完成
- 阻塞IO事件或者通过轮询IO事件的方式
- 阻塞到IO事件,阻塞到read或者write,不能做其它事情
- 读写方法加入到线程里面,通过阻塞线程来实现,对线程的性能开销比较大
- 异步
- 所有的IO读写交给操作系统去处理,应用程序需要等待通知时,可以去做其它事情
选择器Selector
Java NIO的选择器允许一个单独的线程来监视多个输入通道,可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道。
这些通道里已经有可以处理的输入或者选择已准备写入的通道。
这种选择机制,使得一个单独的线程很容易来管理多个通道。
NIO和IO如何影响应用程序的设计
- 对NIO或IO类的API调用
- 数据处理
- 用来处理数据的线程数
- NIO
- 只使用一个或几个单线程管理多个通道(网络连接或文件)
- 代价是解析数据可能会从一个阻塞流中读取更复杂
1 2 3 4 5 6 7 8 9
| FileInputStream input = new FileInputStream("d://info.txt"); BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String nameLine = reader.readLine();
|
1 2 3 4
|
ByteBuffer buffer = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buffer);
|
API调用
- NIO
- IO
- 从一个InputStream/Reader逐字节读取
核心对象
- 缓冲区Buffer
- 通道Channel
- 选择器Selector
缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组。NIO库中,所有数据都是用缓冲区处理的。从缓冲区读,写入到缓冲区。
所有的缓冲区类型都继承于抽象类Buffer。
- Buffer
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import java.nio.IntBuffer; public class TestIntBuffer { public static void main(String[] args) { IntBuffer buffer = IntBuffer.allocate(8); for (int i = 0; i < buffer.capacity(); ++i) { int j = 2 * (i + 1); buffer.put(j); } buffer.flip(); while (buffer.hasRemaining()) { int j = buffer.get(); System.out.print(j + " "); } } }
|
深入剖析Buffer
一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果我们使用 get()方法从缓冲区获取数据,或者使用 put()方法把数据写入缓冲区,都会引起缓冲区状态的变化。
重要属性
- position
- 指定了下一个将要被写入或者读取的元素索引
- 它的值由 get()/put()方法自动更新
- 在新创建一个Buffer对象时,position被初始化为0。
- limit
- 指定还有多少数据需要取出(在从缓冲区写入通道时)
- 或者还有多少空间可以放入数据(在从通道读入缓冲区时)
- capacity
- 指定了可以存储在缓冲区中的最大数据容量
- 实际上,它指定了底层数据的大小
- 或者至少是指定了准许使用的底层数组的容量
0 <=position<=limit<=capacity
都是在JVM允许范围内
例子:
- 创建一个新的容量大小为10的ByteBuffer对象
- 初始化时
- position为0
- limit和capacity被设置为10
- 使用过程中
- capacity不会发生变化
- position和limit会变化
- 读取4个数据
- 从通道读取数据,相当于往缓冲区中写入数据
- position为4
- limit依旧为10
- 读取的数据写入到通道中,相当于从缓冲区中读取数据
- 调用flip()
- 把limit设置为当前的position值
- 把position设置为0
- 可以保证在下一步输出时读取到的是缓冲区的第一个字节
- 调用get()
- 从缓冲区中读取数据写入到输出通道
- position增加
- limit不变
- position不会超过limit
- 读取之前写入到缓冲区的4个字节之后
- 从缓冲区读取数据完毕后
- limit的值仍然保持在调用flip()方法时的值
- 调用clear()能够把所有的状态变化设置为初始化时的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import java.io.FileInputStream; import java.nio.*; import java.nio.channels.*;
public class Test {
public static void main(String[] args) throws Exception { FileInputStream fin = new FileInputStream("/Users/2bai/Downloads/test.txt"); FileChannel fc = fin.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(10); output("初始化", buffer); fc.read(buffer); output("调用 read()", buffer); buffer.flip(); output("调用 flip()", buffer); while (buffer.remaining() > 0) { byte b = buffer.get(); } output("调用 get()", buffer);
buffer.clear(); output("调用 clear()", buffer); fin.close(); }
public static void output(String step, Buffer buffer) { System.out.println(step + " : "); System.out.print("capacity: " + buffer.capacity() + ", "); System.out.print("position: " + buffer.position() + ", "); System.out.println("limit: " + buffer.limit()); System.out.println(); } }
|
缓冲区的分配
在创建一个缓冲区对象时,会调用静态方法allocate()来指定缓冲区的容量。其实调用allocate()相当于创建了一个指定大小的数组,并把它包装为缓冲区对象。也可以直接将一个的数组,包装为缓冲区对象。
1 2 3 4 5 6 7 8 9 10
| import java.nio.ByteBuffer; public class BufferWrap { public void myMethod() { ByteBuffer buffer1 = ByteBuffer.allocate(10); byte array[] = new byte[10]; ByteBuffer buffer2 = ByteBuffer.wrap( array ); } }
|
缓冲区分片
在NIO中,除了可以分配或者包装一个缓冲区对象外,还可以根据现有的缓冲区对象来创建一个子缓冲区,即在现有缓冲区上切出一片来作为一个新的缓冲区。
现有的缓冲区与创建的子缓冲区在底层数组层面上是数据共享的。
子缓冲区相当于是现有缓冲区的一个视图窗口。
调用slice()可以创建一个子缓冲区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public static void main(String args[]) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(10); for (int i = 0; i < buffer.capacity(); ++i) { buffer.put((byte) i); } buffer.position(3); buffer.limit(7); ByteBuffer slice = buffer.slice(); for (int i = 0; i < slice.capacity(); ++i) { byte b = slice.get(i); b *= 10; slice.put(i, b); } buffer.position(0); buffer.limit(buffer.capacity()); while (buffer.remaining() > 0) { System.out.println(buffer.get()); } }
|
只读缓冲区
只能读取,不能写入。
通过调用asReadOnlyBuffer(),将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过是只读的。
如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化。
常规缓冲区可以转换为只读缓冲区,
只读缓冲区不能转换为常规缓冲区。
常用于控制权限(回调方法),比如:在将缓冲区的引用传递给某个对象的方法时,你无法确定这个方法是否会修改缓冲区中的数据,这个时候,就可以创建一个只读缓冲区,然后把只读缓冲区的引用传递给那个方法,这样就能保证,使用那个方法的人没办法去修改缓冲区中的数据了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void main(String args[]) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(10); for (int i = 0; i < buffer.capacity(); ++i) { buffer.put((byte) i); } ByteBuffer readonly = buffer.asReadOnlyBuffer(); for (int i = 0; i < buffer.capacity(); ++i) { byte b = buffer.get(i); b *= 10; buffer.put(i, b); } readonly.position(0); readonly.limit(buffer.capacity()); while (readonly.remaining() > 0) { System.out.println(readonly.get()); } }
|
- 通道是一个对象,可以读取和写入数据。
- 所有数据都通过Buffer对象来处理,不会将字节直接写入通道或直接从通道中读取数据。
- 所有的通道对象都实现了Channel接口。
使用NIO读取数据
- 从FileInputStream获取Channel
- 创建Buffer
- 将数据从Channel读取到Buffer中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String args[]) { FileInputStream fin = null; try { fin = new FileInputStream("/Users/2bai/Downloads/test.txt"); FileChannel fc = fin.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); fc.read(buffer); buffer.flip(); while (buffer.remaining() > 0) { byte b = buffer.get(); System.out.print(((char) b)); } } catch (Exception e) { e.printStackTrace(); } finally { try { fin.close(); } catch (IOException e) { e.printStackTrace(); } } }
|
使用NIO写入数据
- 从FileInputStream获取Channel
- 创建Buffer
- 将数据从Channel写入到Buffer中
1 2 3 4 5 6 7 8 9
| public static void main(String args[]) throws Exception { FileOutputStream fout = new FileOutputStream("/Users/2bai/Downloads/test.txt"); FileChannel fc = fout.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(Charset.forName("utf8").encode("你好你好你好你好你好")); buffer.flip(); fc.write(buffer); fout.close(); }
|
阻塞I/O通信模型
缺点
- 当客户端多时,会创建大量的处理线程,且每个线程都要占用栈空间和CPU时间
- 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。在这种情况下非阻塞I/O就又了它的应用前景
Java NIO原理及通信模型
- 由一个专门的线程来处理所有的IO事件,并负责分发
- 事件驱动机制
- 线程通讯
- 线程之间通过wait、notify等方式通讯
- 保证每次上下文切换都是有意义的,减少无谓的线程切换
客户端—>Reactor(分发wait/notify)—>处理线程N(read、decode、compute、encode、send)
每个线程的处理流程大概都是读取数据、解码、计算处理、编码、发送相应。
传统的Server/Client模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独处理一个客户请求。
问题:线程数量的剧增,大量线程会增大服务器开销。
解决:线程池。
产生问题:线程池中的线程都在处理,超过线程池数量的请求无法处理。
NIO中非阻塞I/O采用了基于Reactor模式的工作方式,I/O调用不会被阻塞,可以注册感兴趣的特定I/O事件,如可读数据到达,新的Socket连接等等,在发生特定事件时,系统再通知。
NIO中实现非阻塞I/O的核心对象就是Selector,Selector就是注册各种I/O事件的地方。而且当事件发生时,就是Selector告诉所发生的事件。
当有读或写等任何注册的事件发生时,可以从Selector中获得相应的SelectorKey,同事从SelectorKey中可以找到发生的事件和该事件所发生的具体SelectableChannel,以获得客户端发送过来的数据。
Select是NIO的核心,底层使用epoll模型。
- 使用NIO中非阻塞I/O编写服务器处理程序
- 向Selector对象注册感兴趣的事件
- 从Selector中获得去感兴趣的事件
- 根据不同的事件进行相应的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
|
private Selector getSelector() throws IOException { Selector sel = Selector.open(); ServerSocketChannel server = ServerSocketChannel.open(); server.configureBlocking(false); ServerSocket socket = server.socket(); InetSocketAddress address = new InetSocketAddress(port); socket.bind(address); server.register(sel, SelectionKey.OP_ACCEPT); return sel; }
public void listen() { System.out.println("listen on " + port); try { while (true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = (SelectionKey) iter.next(); iter.remove(); process(key); } } } catch (IOException e) { e.printStackTrace(); } }
private void process(SelectionKey key) throws IOException { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); int len = channel.read(buffer); if (len > 0) { buffer.flip(); String name = new String(buffer.array(), 0, len); SelectionKey sKey = channel.register(selector, SelectionKey.OP_WRITE); sKey.attach(name); } else { channel.close(); } buffer.clear(); } else if (key.isWritable()) { SocketChannel channel = (SocketChannel) key.channel(); String name = (String) key.attachment(); ByteBuffer block = ByteBuffer.wrap(("Hello " + name).getBytes()); if (block != null) { channel.write(block); } else { channel.close(); } } }
|
类
FileChannel
ServerSocketchannel
SocketChannel
ByteBuffer
MappedByteBuffer
方法
wrap
slice
allocate
allocateDirect
asReadOnlyBuffer
动态改变position的值(油表)
put
get
Selector.open()
java.nio.channels.Selector#open
1 2 3
| public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
|
Selector
是调用的SelectorProvider.provider()
拿到的provider
对象创建的ServerSocketChannel
,上文的Selector
、Pipe
都是通过这一个provider
对象创建的。
java.nio.channels.spi.SelectorProvider#provider
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
|
对于windows操作系统来说他的实现类就是WindowsSelectorProvider
,而WindowsSelectorProvider
类中只是实现了openSelector
方法。
WindowsSelectorProvider#openSelector
1 2 3 4 5 6 7 8
| public AbstractSelector openSelector() throws IOException{ return new WindowsSelectorImpl(this); }
|
WindowsSelectorImpl(SelectorProvider)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
|
java.nio.channels.Pipe#open
1 2 3
| public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); }
|
sun.nio.ch.SelectorProviderImpl#openPipe
1 2 3
| public Pipe openPipe() throws IOException { return new PipeImpl(this); }
|
sun.nio.ch.PipeImpl
1 2 3 4 5 6 7 8 9 10 11
| PipeImpl(SelectorProvider sp) { int[] fdes = new int[2]; IOUtil.initPipe(fdes, true); FileDescriptor sourcefd = new FileDescriptor(); IOUtil.setfdVal(sourcefd, fdes[0]); source = new SourceChannelImpl(sp, sourcefd); FileDescriptor sinkfd = new FileDescriptor(); IOUtil.setfdVal(sinkfd, fdes[1]); sink = new SinkChannelImpl(sp, sinkfd); }
|
sun.nio.ch.IOUtil#initPipe
1
| static native void initPipe(int[] fda, boolean blocking);
|
solaris/native/sun/nio/ch/IOUtil.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| JNIEXPORT void JNICALL Java_sun_nio_ch_IOUtil_initPipe(JNIEnv *env, jobject this, jintArray intArray, jboolean block) { int fd[2]; jint *ptr = 0;
if (pipe(fd) < 0) { JNU_ThrowIOExceptionWithLastError(env, "Pipe failed"); return; } if (block == JNI_FALSE) { if ((configureBlocking(fd[0], JNI_FALSE) < 0) || (configureBlocking(fd[1], JNI_FALSE) < 0)) { JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); } } ptr = (*env)->GetPrimitiveArrayCritical(env, intArray, 0); ptr[0] = fd[0]; ptr[1] = fd[1]; (*env)->ReleasePrimitiveArrayCritical(env, intArray, ptr, 0); }
|
ServerSocketChannel.open()
java.nio.channels.ServerSocketChannel#open
1 2 3
| public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
|
openServerSocketChannel
方法也和openPipe
方法一样由父类SelectorProviderImpl
来实现的
sun.nio.ch.SelectorProviderImpl#openServerSocketChannel
1 2 3
| public ServerSocketChannel openServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this); }
|
sun.nio.ch.ServerSocketChannelImpl
1 2 3 4 5 6 7 8 9 10 11 12
| ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(fd); this.state = ST_INUSE; }
|
java.nio.channels.spi.AbstractSelectableChannel#configureBlocking
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public final SelectableChannel configureBlocking(boolean block) throws IOException{ synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if (blocking == block) return this; if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; }
|
sun.nio.ch.ServerSocketChannelImpl#implConfigureBlocking
1 2 3 4
| protected void implConfigureBlocking(boolean block) throws IOException { IOUtil.configureBlocking(fd, block); }
|
server.socket();
sun.nio.ch.ServerSocketChannelImpl#socket
1 2 3 4 5 6 7 8 9
| public ServerSocket socket() { synchronized (stateLock) { if (socket == null) socket = ServerSocketAdaptor.create(this); return socket; } }
|
sun.nio.ch.ServerSocketAdaptor#create
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static ServerSocket create(ServerSocketChannelImpl ssc) { try { return new ServerSocketAdaptor(ssc); } catch (IOException x) { throw new Error(x); } }
private ServerSocketAdaptor(ServerSocketChannelImpl ssc) throws IOException{ this.ssc = ssc; }
|
socket.bind
sun.nio.ch.ServerSocketAdaptor#bind
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void bind(SocketAddress local) throws IOException { bind(local, 50); }
public void bind(SocketAddress local, int backlog) throws IOException { if (local == null) local = new InetSocketAddress(0); try { ssc.bind(local, backlog); } catch (Exception x) { Net.translateException(x); } }
|
sun.nio.ch.ServerSocketChannelImpl#bind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { synchronized (lock) { if (!isOpen()) throw new ClosedChannelException(); if (isBound()) throw new AlreadyBoundException(); InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) : Net.checkAddress(local); SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkListen(isa.getPort()); NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); Net.bind(fd, isa.getAddress(), isa.getPort()); Net.listen(fd, backlog < 1 ? 50 : backlog); synchronized (stateLock) { localAddress = Net.localAddress(fd); } } return this; }
|
server.register
往Selector注册Channel,通道触发了一个事件意味着该事件已经就绪
java.nio.channels.SelectableChannel#register
- 可以注册四种事件,
java.nio.channels.SelectionKey
内定义:
OP_READ
(1)
OP_WRITE
(4)
OP_CONNECT
(8)
OP_ACCEPT
(16)
- 一个server socket channel准备好接收新进入的连接
- 注册多个事件,用“位或|”
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
1 2 3 4 5 6 7 8 9
|
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException{ return register(sel, ops, null); }
|
java.nio.channels.spi.AbstractSelectableChannel#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public final SelectionKey register(Selector sel, int ops,Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
|
sun.nio.ch.SelectorImpl#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment){ if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; }
|
implRegister
sun.nio.ch.SelectionKeyImpl
1 2 3 4 5 6 7 8 9
| final SelChImpl channel; public final SelectorImpl selector;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) { channel = ch; selector = sel; }
|
接着调用implRegister,这个方法的实现就是Selector的实现类,系统不同,实现类不同。
PollSelectorImpl(Linux),WindowsSelectorImpl(Windows),KQueueSelectorImpl(Mac)
sun.nio.ch.AbstractPollSelectorImpl#implRegister
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (closed) throw new ClosedSelectorException(); if (channelArray.length == totalChannels) { int newSize = pollWrapper.totalChannels * 2; SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; for (int i=channelOffset; i<totalChannels; i++) temp[i] = channelArray[i]; channelArray = temp; pollWrapper.grow(newSize); } channelArray[totalChannels] = ski; ski.setIndex(totalChannels); pollWrapper.addEntry(ski.channel); totalChannels++; keys.add(ski); } }
|
interestOps
sun.nio.ch.SelectionKeyImpl#interestOps
1 2 3 4
| public SelectionKey interestOps(int ops) { ensureValid(); return nioInterestOps(ops); }
|
sun.nio.ch.SelectionKeyImpl#nioInterestOps
1 2 3 4 5 6 7
| public SelectionKey nioInterestOps(int ops) { if ((ops & ~channel().validOps()) != 0) throw new IllegalArgumentException(); channel.translateAndSetInterestOps(ops, this); interestOps = ops; return this; }
|
Selector
和ServerSocketChannel
都创建好了,通过ServerSocketChannel
的register
方法把二者绑定在一起,也就是把创建ServerSocketChannel
时创建的FD
(file descriptor)与Selector绑定在了一起。
至此,server端已经启动完成了。
selector.select()
监听哪些Channel已经就绪
selectNow的选择过程是非阻塞的,select(timeout)和select()的选择过程是阻塞的
sun.nio.ch.SelectorImpl#select
1 2 3 4 5 6 7 8 9 10 11 12
|
public int select(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("Negative timeout"); return lockAndDoSelect((timeout == 0) ? -1 : timeout); }
public int select() throws IOException { return select(0); }
|
sun.nio.ch.SelectorImpl#lockAndDoSelect
1 2 3 4 5 6 7 8 9 10 11 12
| private int lockAndDoSelect(long timeout) throws IOException { synchronized (this) { if (!isOpen()) throw new ClosedSelectorException(); synchronized (publicKeys) { synchronized (publicSelectedKeys) { return doSelect(timeout); } } } }
|
sun.nio.ch.PollSelectorImpl#doSelect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(totalChannels, 0, timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.getReventOps(0) != 0) { pollWrapper.putReventOps(0, 0); synchronized (interruptLock) { IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
|
sun.nio.ch.PollArrayWrapper#poll
1 2 3 4 5 6 7
| int poll(int numfds, int offset, long timeout) { return poll0(pollArrayAddress + (offset * SIZE_POLLFD), numfds, timeout); }
private native int poll0(long pollAddress, int numfds, long timeout);
|
如果有已经就绪的pollfd,poll0()就会返回,没有则一直阻塞,也就是selector.select()会一直阻塞。poll0()返回后,selector.select()就会返回,所以在监听时要用while(true),这样就可以保证在selector接收到数据并处理完后继续监听poll()。
其它
selector.wakeUp()
个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。
wakeup()是通过pipe发送一个字节来唤醒poll().
selector.close()
用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
查找native方法源码
一般,从JDK目录下的native目录(可能跟OS平台相关)根据包名路径去找相关的Class名.c文件即可找到对应的native方法。
如果间接调用了hotspot的实现(jvm会以动态库的形式被加载,prims定义了hotspot与其它模块的接口及实现),那么从share/vm/prims/jvm.cpp文件可找到JVM_XXX函数的实现。
NIO & AIO 对比
属性 |
同步阻塞IO(BIO) |
伪异步IO |
非阻塞IO(NIO) |
异步IO(AIO) |
客户端数:IO线程数 |
1:1 |
M:N(M>N) |
M:1 |
M:0 |
阻塞类型 |
阻塞 |
阻塞 |
非阻塞 |
非阻塞 |
同步 |
同步 |
同步 |
同步(多路复用) |
异步 |
API使用难度 |
简单 |
简单 |
复杂 |
复杂 |
调试难度 |
简单 |
简单 |
复杂 |
复杂 |
可靠性 |
非常差 |
差 |
高 |
高 |
吞吐量 |
低 |
中 |
高 |
高 |
参考资料
grepcode-openjdk
hg.openjdk.java.net
eclipse插件
Java Source Attacher