Netty

一、NIO编程

1.1 三大组件

NIO:none-blocking io,非阻塞IO

传统的网络IO的缺点:

  • 若采用多线程:
    • 内存占用高
    • 线程上下文切换成本高
    • 只适合连接数小的场景
  • 若采用线程池:
    • 仅适合短连接
    • 阻塞模式下,每个线程只能处理一个socket连接

使用selector:

graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

1.2 ByteBuffer

使用FileChannel来读取文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
public class ChannelDemo1 {
public static void main(String[] args) {
try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(10);
do {
// 向 buffer 写入
int len = channel.read(buffer);
log.debug("读到字节数:{}", len);
if (len == -1) {
break;
}
// 切换 buffer 读模式
buffer.flip();
while(buffer.hasRemaining()) {
log.debug("{}", (char)buffer.get());
}
// 切换 buffer 写模式
buffer.clear();
} while (true);
} catch (IOException e) {
e.printStackTrace();
}
}
}

常见方法:

分配空间:

1
Bytebuffer buf = ByteBuffer.allocate(16);

写入数据:

1
2
3
int readBytes = channel.read(buf);
// 或者
buf.put((byte)127);

读取数据:

1
2
3
int writeBytes = channel.write(buf);
// 或者
byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

mark 和 reset:

  • mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置。

  • rewind 和 flip 都会清除 mark 位置

字符串与 ByteBuffer 互转:

1
2
3
4
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");

String str1 = StandardCharsets.UTF_8.decode(buffer1).toString();

1.3 Scattering Reads

即分散读取(假如待读取的文件有很多部分)

1
2
3
4
5
6
7
8
9
10
11
12
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer a = ByteBuffer.allocate(3);
ByteBuffer b = ByteBuffer.allocate(3);
ByteBuffer c = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{a, b, c});
a.flip();
b.flip();
c.flip();
} catch (IOException e) {
e.printStackTrace();
}

1.4 Gathering Writes

即集中写入(假如待写入的内容也有很多部分)

1
2
3
4
5
6
7
8
ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");
ByteBufferUtil.debugAll(b1);
try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{b1, b2, b3});
} catch (IOException e) {
}

1.5 网络编程

首先介绍一下非堵塞模式下的NIO,使用“不断轮询”实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
if (sc != null) {
log.debug("connected... {}", sc);
sc.configureBlocking(false); // 非阻塞模式
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
if (read > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
}

接下来使用 selector 进行改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("连接已建立: {}", sc);
} else if (key.isReadable()) {
try{
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
}
} catch (IOException e){
key.cancel
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

Netty
https://d4wnnn.github.io/2024/06/26/Dev/Netty/
作者
D4wn
发布于
2024年6月26日
许可协议