一、NIO编程
1.1 三大组件
NIO:none-blocking io,非阻塞IO
传统的网络IO的缺点:
- 若采用多线程:
- 内存占用高
- 线程上下文切换成本高
- 只适合连接数小的场景
- 若采用线程池:
- 仅适合短连接
- 阻塞模式下,每个线程只能处理一个socket连接
使用selector:
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end
1.2 ByteBuffer
使用FileChannel来读取文件内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Slf4j public class ChannelDemo1 { public static void main(String[] args) { try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) { FileChannel channel = file.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(10); do { int len = channel.read(buffer); log.debug("读到字节数:{}", len); if (len == -1) { break; } buffer.flip(); while(buffer.hasRemaining()) { log.debug("{}", (char)buffer.get()); } buffer.clear(); } while (true); } catch (IOException e) { e.printStackTrace(); } } }
|
常见方法:
分配空间:
1
| Bytebuffer buf = ByteBuffer.allocate(16);
|
写入数据:
1 2 3
| int readBytes = channel.read(buf);
buf.put((byte)127);
|
读取数据:
1 2 3
| int writeBytes = channel.write(buf);
byte b = buf.get();
|
get 方法会让 position 读指针向后走,如果想重复读取数据
- 可以调用 rewind 方法将 position 重新置为 0
- 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针
mark 和 reset:
字符串与 ByteBuffer 互转:
1 2 3 4
| ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好"); ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");
String str1 = StandardCharsets.UTF_8.decode(buffer1).toString();
|
1.3 Scattering Reads
即分散读取(假如待读取的文件有很多部分)
1 2 3 4 5 6 7 8 9 10 11 12
| try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) { FileChannel channel = file.getChannel(); ByteBuffer a = ByteBuffer.allocate(3); ByteBuffer b = ByteBuffer.allocate(3); ByteBuffer c = ByteBuffer.allocate(5); channel.read(new ByteBuffer[]{a, b, c}); a.flip(); b.flip(); c.flip(); } catch (IOException e) { e.printStackTrace(); }
|
1.4 Gathering Writes
即集中写入(假如待写入的内容也有很多部分)
1 2 3 4 5 6 7 8
| ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello"); ByteBuffer b2 = StandardCharsets.UTF_8.encode("world"); ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好"); ByteBufferUtil.debugAll(b1); try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) { channel.write(new ByteBuffer[]{b1, b2, b3}); } catch (IOException e) { }
|
1.5 网络编程
首先介绍一下非堵塞模式下的NIO,使用“不断轮询”实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>(); while (true) { SocketChannel sc = ssc.accept(); if (sc != null) { log.debug("connected... {}", sc); sc.configureBlocking(false); channels.add(sc); } for (SocketChannel channel : channels) { int read = channel.read(buffer); if (read > 0) { buffer.flip(); debugRead(buffer); buffer.clear(); log.debug("after read...{}", channel); } } }
|
接下来使用 selector 进行改进:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Slf4j public class ChannelDemo6 { public static void main(String[] args) { try (ServerSocketChannel channel = ServerSocketChannel.open()) { channel.bind(new InetSocketAddress(8080)); Selector selector = Selector.open(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) { int count = selector.select(); Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { ServerSocketChannel c = (ServerSocketChannel) key.channel(); SocketChannel sc = c.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); log.debug("连接已建立: {}", sc); } else if (key.isReadable()) { try{ SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(128); int read = sc.read(buffer); if(read == -1) { key.cancel(); sc.close(); } else { buffer.flip(); } } catch (IOException e){ key.cancel } } iter.remove(); } } } catch (IOException e) { e.printStackTrace(); } } }
|