本篇文章为你整理了Netty(1)——NIO基础(netty nioeventloop)的详细内容,包含有netty入门教程 netty nioeventloop netty快速入门 netty oom Netty(1)——NIO基础,希望能帮助你了解 Netty(1)——NIO基础。
Netty是由JBOSS提供的Java开源网络应用程序框架,其底层是基于Java提供的NIO能力实现的。因此为了掌握Netty的底层原理,需要首先了解Java NIO的原理。
NIO简介
计算机主要由CPU、内存、外存、IO设备等硬件组成,计算机执行计算的过程就是CPU从内存中获取数据,进行计算,然后再将计算结果写入内存中。但由于内存非常昂贵且下电后数据会丢失,计算机需要使用外存来持久化存储大规模的数据,外存提供了大量的存储空间,代价是其存取速度远小于内存。除了读取外存数据,计算机还可以从网络设备获取网络中的数据,受网络传输速度的限制,计算机获取网络数据的速度也远小于其读取内存的速度。
对于存取这些低速IO设备,操作系统(如Linux)提供了5种不同的IO模型。对于Java来说,其最先提供的就是基于最简单的阻塞式IO模型实现的BIO(Blocking IO)库。当调用BIO库读取硬盘中的数据时,用户进程会一直被阻塞在读数据的接口上,直到数据被操作系统从硬盘中获取出来并返回给用户,这时用户进程才能继续向下执行。由于读取硬盘速度相比CPU计算速度慢很多,进程就会一直被卡在读取数据这里,用户体验就是进程没有响应。即使CPU处于空闲状态,也无法使用CPU进行其他工作。这就浪费了大量的资源,同时也给用户造成了不好的体验。
除了最传统的阻塞式IO,操作系统还提供了其他几种改进的IO模型,总体思想都是尽可能减少用户进程阻塞在IO上的时间,在进行慢速设备IO时,进程无需等待,可以继续处理其他指令,当数据获取完成时操作系统再通知用户进程可以进行后续的数据处理操作。因此Java在1.4版本后就推出了一套新的IO接口NIO(New IO),这套IO接口基于多路复用IO模型,提供了非阻塞的IO能力,因此NIO中的N也可以理解为Non-blocking。这套NIO接口实现了只用一个线程来轮询等待所有应用进程的IO就绪状态,当某个应用进程的IO状态就绪时,再通知对应进程进行数据读写的操作。这就避免了每个应用进程在IO时被阻塞,为开发高性能和高并发的应用提供了关键能力。
NIO的3个核心组件
Channel
Buffer
Selector
Channel(通道)
Channel 是 NIO 的核心概念,它表示一个打开的连接,是数据读写的双向通道,这个连接可以连接到 I/O 设备(例如:磁盘文件,Socket)或者一个支持 I/O 访问的应用程序,Java NIO 使用缓冲区和通道来进行数据传输。
Channel的主要实现类有:
FileChannel(读写文件)
DatagramChannel(UDP编程)
SocketChannel(TCP编程)
ServerSocketChannel(TCP编程)
FileChannel
FileChannel只能工作在阻塞模式下,不能配合Selector
FileChannel不能直接打开,必须通过FileInputStream,FileOutputStream或RandomAccessFile来获取,它们都有getChannel()方法
通过FileInputStream获取的channel只能读
通过FileOutputStream获取的channel只能写
通过RandomAccessFile是否能读写根据构造时的读写模式决定
transferTo方法
效率相比使用流式方式拷贝数据高很多,底层使用了操作系统提供的零拷贝特性。
一次最多传输2g的数据
Buffer(缓冲区)
Buffer是NIO的另一个核心概念,NIO库操作数据都是通过缓冲区处理的,在数据读写的过程都要先经过缓冲区,然后再从缓冲区中按照块来处理数据。
从类图中可以看到,7 种数据类型对应着 7 种子类,这些名字是 Heap 开头子类,数据是存放在 JVM 堆中的。
MappedByteBuffer
MappedByteBuffer存放在堆外直接内存中,可以与文件进行映射。
通过java.nio包和MappedByteBuffer允许Java程序直接从内存中读取文件内容,通过将整个或部分文件映射到内存,由操作系统来处理加载请求和写入文件,应用只需要和内存打交道,这使得IO操作非常快。
Mmap内存映射和普通标准IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。
ByteBuffer的正确使用方式
向buffer中写入数据,例如channel.read(buf)
调用flip()切换成读模式
从buffer中读取数据,例如buffer.get()
调用clear()或compact()切换为写模式
重复1-4
public static void main(String[] args) {
try (FileInputStream fileInputStream = new FileInputStream("data.txt");
FileChannel channel = fileInputStream.getChannel()) {
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true) {
int len = channel.read(buffer);
if (len 0) {
break;
log.info("读到的字符数:{}", len);
buffer.flip();
while (buffer.hasRemaining()) {
byte b = buffer.get();
log.info("读到的字符:{}", (char) b);
buffer.clear();
} catch (IOException e) {
throw new RuntimeException(e);
Java规定内码使用UTF-16编码,一个字符占用2个字节,也就是Java中的char类型在内存中是使用UTF-16的编码形式存储的。而我们代码中读取的文件data.txt使用的是UTF-8编码格式,因此对于其中的英文字符和数字,一个字符只占用一个字节。因此代码中我们可以每次读取一个字节,然后再把它转换成Java内部的char。
Buffer的结构
Buffer是由特定基本类型的元素组成的线性有限序列,除了Buffer里面的内容,其最重要的属性就是它的capacity,limit和position。
capacity:Buffer中可以储存的元素数量,Buffer的capacity不能为负值也永远不会改变。
limit:Buffer中第一个不能被读取或写入的元素的位置,limit不能为负值也不能大于capacity。
position:Buffer中下一个将要被读取或写入的元素的位置,position不能为负值也不能大于limit。
三个属性是如何控制读写过程的
通过源码可以看出,调用ByteBuffer.allocate(10)的时候,我们初始化了一个HeapByteBuffer对象,并将其capacity和limit均设置为10,position被设置为0。
// HeapByteBuffer.java,调用的HeapByteBuffer中的构造方法
HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
当调用channel.read(buf)向Buffer中写入数据时,根据源码分析,其最终会调入ByteBuffer的put()方法中,HeapByteBuffer对其的实现如下,nextPutIndex()方法检查当前position是否大于等于limit,如果小于limit,则将原position返回,并将原position加1。
当写入完成后,我们调用flip()方法,所谓将Buffer切换为读模式,其实源码中就是将position和limit的位置重新赋值。如此操作后,position就是我们读取数据的起点,limit就是我们读取数据的终点。
切换读模式后,就可以调用buffer.get()方法来获取一个字节,通过源码可以看出,nextGetIndex()方法检查当前position是否大于等于limit,如果小于limit,则将原位置返回,并将原位置加1。
当读取所有数据后,可以调用buffer.clear()方法或buffer.compact()方法将Buffer切换为写模式。
clear(): 直接将Buffer重置为初始状态,忽略还没有读完的数据。
compact():将还没读完的数据复制到缓冲区头部,然后从没读完的数据后可以开始写入新的数据
public ByteBuffer compact() {
System.arraycopy(hb, ix(position()), hb, ix(0), remaining()); // 将还没读完的数据拷贝到数组头部
position(remaining()); // 将position重置为剩余待读数据之后
limit(capacity()); // 将limit重置为capacity
discardMark();
return this;
mark和reset
mark是Buffer中的另一个属性,它的主要用途是记录一个position的位置,后续调用reset()方法后会将position重置到mark的位置。mark可以不被定义,但如果设置了mark的值,则它不能为负值且不能大于position的值。
粘包和半包
比如我们想要发送三行数据
Hello world.\n
It is my life.\n
I love you.\n
为了提高发送效率,通常我们会将这三行字符串合并到一个Buffer中进行发送。另一端在接收到消息时,由于协议并不理解消息的内容,因此用户在读取数据时,有可能读取出来如下两个包。
Hello world.\nIt is my life.\nI Lo
ve you.\n
这里第一个包出现了原来的两条数据在一个包中的情况,这就叫做粘包。第一行最后的数据将原来的一条数据截断了,这就叫做半包。
我们可以通过如下方式处理粘包和半包的问题。
点击查看代码
public class TestStickyAndHalfPackage {
public static void main(String[] args) {
ByteBuffer p1 = ByteBuffer.allocate(64);
p1.put("Hello world.\nIt is my life.\nI lo".getBytes());
split(p1);
p1.put("ve you.\n".getBytes());
split(p1);
private static void split(ByteBuffer buffer) {
buffer.flip();
for (int i = 0; i buffer.limit(); ++i) {
if (buffer.get(i) == \n) {
int len = i + 1 - buffer.position();
ByteBuffer line = ByteBuffer.allocate(len);
for (int j = 0; j len; ++j) {
line.put(buffer.get());
line.flip();
System.out.print(StandardCharsets.UTF_8.decode(line));
buffer.compact(); // 将没有读完的数据移动到buffer头部,这里是处理半包和粘包的关键
Selector(选择器)
概念继承于操作系统IO模型中多路复用IO模型中的selector,其主要作用是,用户可以把所有读写Channel都注册在某个Selector上,Selector会不断的轮询注册在上面的所有channel,如果某个channel为读写等事件做好准备,那么就处于就绪状态,通过Selector可以不断轮询发现出就绪的channel,进行后续的IO操作。为何要做这种设计呢?
如果每一个Channel都需要一个线程来为其IO过程提供服务,则会占用大量的内存,CPU需要在很多线程间进行切换,有太多额外开销,而且随着连接数量增加,线程数量会达到上限,无法支持大连接数。
有一种解决方案是使用有固定线程数量的线程池来处理所有连接请求,但线程池中的线程一旦被占用,就要阻塞等待IO完成才能被其他连接使用,如果IO请求花费时间很长,那会导致后续的大量IO请求需要排队等待。这种情况只适合处理短连接比较多的场景。
针对连接数量非常多,数据流量比较少的场景,多路复用的IO模型就比较适合。如下图所示,每一个Channel可以把自己注册到一个独立运行的Selector线程中,这个Selector线程会轮询所有Channel的读写状态,当发现一个就绪的Channel时,就可以使用工作线程为这个Channel提供服务。这样工作线程就不需要阻塞在某一个Channel上,只有真正要进行数据读写时才分配给某个Channel,极大提高了线程的利用率。
NIO消息服务器示例
点击查看代码
@Slf4j
public class Server {
public static void main(String[] args) {
try (Selector selector = Selector.open(); ServerSocketChannel scc = ServerSocketChannel.open();) {
scc.configureBlocking(false);
scc.bind(new InetSocketAddress(8080));
final SelectionKey sccKey = scc.register(selector, 0, null);
sccKey.interestOps(SelectionKey.OP_ACCEPT); // 配置这个channel只关注accept事件
while (true) {
selector.select();
final Iterator SelectionKey iter = selector.selectedKeys().iterator(); // 每次准备就绪的channel会被放到selectedKeys这个集合中,但删除,需要手动删除
while (iter.hasNext()) {
final SelectionKey key = iter.next();
log.debug("key: {}", key);
iter.remove(); // 手动从集合中删除处理过的key
if (key.isAcceptable()) {
final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
final SocketChannel sc = ssc.accept();
sc.configureBlocking(false); // 使用selector时,channel都需要配置成非阻塞的
log.debug("accept: {}", sc);
final SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {
try {
final SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
final int read = sc.read(buffer);
if (read 0) { // 客户端调用close()正常关闭时,会生成一次READ事件,实际read的返回值是-1,这里处理客户端正常退出
log.warn("cancel {}", key);
key.cancel();
} else {
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
log.debug("after read: {}", sc);
} catch (IOException e) { // 这里处理客户端异常退出
e.printStackTrace();
key.cancel();
} catch (IOException e) {
e.printStackTrace();
处理消息边界问题
以上就是Netty(1)——NIO基础(netty nioeventloop)的详细内容,想要了解更多 Netty(1)——NIO基础的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。