6、java API 实战多路复用器()

  本篇文章为你整理了6、java API 实战多路复用器()的详细内容,包含有 6、java API 实战多路复用器,希望能帮助你了解 6、java API 实战多路复用器。

   private ServerSocketChannel server = null;

   private Selector selector = null; //linux 多路复用器(select poll epoll kqueue) nginx event{}

   int port = 9090;

   public void initServer() {

   try {

   server = ServerSocketChannel.open();

   server.configureBlocking(false);

   server.bind(new InetSocketAddress(port));

   //如果在epoll模型下,open--》 epoll_create - fd3

   selector = Selector.open(); // select poll *epoll 优先选择:epoll 但是可以 -D修正,使用poll时候是没有和内核进行交互的,只是在底层的C代码中开辟了用户空间,只有使用epoll才会有系统调用

   //server 约等于 listen状态的 fd4

   register

   select,poll:jvm里开辟一个数组 fd4 放进去,没有系统调用

   epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN 有系统调用

   server.register(selector, SelectionKey.OP_ACCEPT);

   } catch (IOException e) {

   e.printStackTrace();

   public void start() {

   initServer();

   System.out.println("服务器启动了。。。。。");

   try {

   while (true) { //死循环

   Set SelectionKey keys = selector.keys();

   System.out.println(keys.size()+" size");

   //1,调用多路复用器(select,poll 或者 epoll (epoll_wait))

   select()是啥意思:

   1,如果是select 或者 poll 其实 内核的select(fd4) poll(fd4)

   2,如果是epoll: 其实 内核的 epoll_wait()

   *, 参数可以带时间:没有时间,0 : 阻塞,如果有时间给设置一个超时

   selector.wakeup() 结果返回0

   懒加载:

   其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用

   while (selector.select() 0) {

   Set SelectionKey selectionKeys = selector.selectedKeys(); //返回的有状态的fd集合

   Iterator SelectionKey iter = selectionKeys.iterator();

   //so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!

   // NIO 自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?

   //我前边可以强调过,socket: listen 通信 R/W

   while (iter.hasNext()) {

   SelectionKey key = iter.next();

   iter.remove(); //set 不移除会重复循环处理

   if (key.isAcceptable()) {

   //看代码的时候,这里是重点,如果要去接受一个新的连接

   //语义上,accept接受连接且返回新连接的FD对吧?

   //那新的FD怎么办?

   //如果是select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起

   //如果是epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间

   acceptHandler(key);

   } else if (key.isReadable()) {

   readHandler(key); //连read 还有 write都处理了

   //在当前线程读数据,这个方法可能会阻塞,如果阻塞了十年,其他的IO早就没电了。。。

   //所以,为什么提出了 IO THREADS

   //redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的

   //tomcat 8,9 异步的处理方式 IO 和 处理上 解耦

   } catch (IOException e) {

   e.printStackTrace();

   public void acceptHandler(SelectionKey key) {

   try {

   ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

   SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端 fd7

   client.configureBlocking(false);

   ByteBuffer buffer = ByteBuffer.allocate(8192); //前边讲过了

   //你看,调用了register

   select,poll:jvm里开辟一个数组 fd7 放进去

   epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN

   client.register(selector, SelectionKey.OP_READ, buffer);

   System.out.println("-------------------------------------------");

   System.out.println("新客户端:" + client.getRemoteAddress());

   System.out.println("-------------------------------------------");

   } catch (IOException e) {

   e.printStackTrace();

   public void readHandler(SelectionKey key) {

   SocketChannel client = (SocketChannel) key.channel();

   ByteBuffer buffer = (ByteBuffer) key.attachment();

   buffer.clear();

   int read = 0;

   try {

   while (true) {

   read = client.read(buffer);

   if (read 0) {

   buffer.flip();

   while (buffer.hasRemaining()) {

   client.write(buffer);

   buffer.clear();

   } else if (read == 0) {

   break;

   } else {

   client.close();

   break;

   } catch (IOException e) {

   e.printStackTrace();

   public static void main(String[] args) {

   SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();

   service.start();

  

 

 

  先修改代码,注释掉 client.close(); 这一行
 

  启动代码,设置为使用POLL模型-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider
 

  新建窗口,查看socket连接,可以看到程序启动后出现的那个listen的连接
 

  新建窗口,连接服务端9090端口
 

  切换另一个窗口,再查看一下连接,发现出现了建立的连接,还有客户端的程序连接
 

  Ctrl + C停掉客户端的正在连接
 

  切换回来,查看socket连接断开状态,服务端没有停掉,所以还是listen状态,客户端停掉了,所以客户端和服务端的连接已经CLOSE_WAIT,客户端停掉了所以FIN表示关闭了
 

  为什么出现CLOSE_WAIT?

  四次分手之前存在的状态
 

  当服务端收到一个客户端要和服务端进行断开的数据包FIN后,服务端会进入一个状态CLOSE_WAIT,然后给客户端发送一个FIN_ACK,客户端收到后会进入 FIN_WAIT2状态,但是这个时候客户端和服务端的连接不会断开,因为启动服务端时候client.close();这行代码注释了,所以没有断开。

  修改代码,把client.close();这行代码注释打开,然后启动,现在如果客户端关闭的话,就会和服务端断开连接了
 

  连接服务端9090端口,
 

  查看socket连接,客户端和服务端已经正常连接
 

  客户端发送数据也没问题,然后客户端断开连接
 

  断开连接后,查看socket连接,发现曾经的客户端nc的程序已经没了,状态变成了TIME_WAIT
 

  其实这时候服务端有个瞬时状态closed,虽然看不到,这时客户端会进入TIME_WAIT
 

  这个时候一直不停的去查看socket连接, 可以一直看到客户端的TIME_WAIT状态,一直不停的查看,等一会儿就可以看到,客户端这个状态TIME_WAIT直接没了。
 

  经历了一段时间才会消失,它等待的时间是2MSL,是一个报文活动时间的2倍的值,可能是30秒,1分钟或2分钟。因为最后一次发送确认断开连接的时候,没有确认最后一次的连接有没有到达对方,因为可能出现网络问题等因素,网络本来就是虚拟的,不是物理的,所以自己要稍微等一会儿。有可能最后的ack没有到达对方,自己多留一会资源。
 

  其实不管是客户端还是服务端先发起的关闭连接,如果是Server发起断开,那么也会出现这些状态。

  既然网络是个虚拟的,那到底会不会消耗资源?
 

  是会消耗的!消耗SOCKET四元组规则,当正常连接时候,会有一个文件描述符关联客户端和服务端的连接,
 

  使用命令lsop -p 服务端的java进程端口查看
 

  虽然客户端发起关闭连接后,虽然查看不到关联的客户端和服务端连接的文件描述符,
 

  但是这个四元组的连接还是要停留一会儿,还是会在内核里占用资源的,java进程已经结束了,但是内核中还存在,显示状态是TIME_WAIT
 

  所谓四元组,就是两者简历连接的唯一表示,比如两端的ip+端口号组成的四元组还是处于TIME_WAIT状态在等待的过程中,如果这个时候再疯狂的让客户端和服务端进行连接,是肯定不会出现当前这个TIME_WAIT状态一样的四元组的连接,因为还是正在被占用着的。
 

  JAVA的API执行底层实现

  通过上面的可以知道java的api只有一套,但可以通过设置让它底层使用poll或者epoll的模型

  OS:POLL

  交给了jdk,执行了native方法,其实最后是开辟了用户空间,在用户空间保存了fd

  查看这个poll.2114文件
 

  能看到得到一个4的文件描述符,然后做了非阻塞,等同于代码中的server.configureBlocking(false);
 

  搜9090端口,查找位置,可以知道绑定4到9090端口,对应代码server.bind(new InetSocketAddress(port));
 

  下面继续找,可以看到发起监听
 

  继续往下走,可以看到一个数据,里面有文件描述符,调用了POLLIN,还有一个文件描述符4,返回等于1,对应代码while (selector.select() 500) {
 

  往下看,调用了accept,得到一个37989客户端,返回一个7,也就是新的客户端,对应代码是acceptHandler方法里的ssc.accept();
 

  往下走看到给7做了个非阻塞,对应代码acceptHandler方法里的client.configureBlocking(false);
 

  再往下走等于程序重新进入下一次循环,又调用了poll,但这时候除了fd4和fd5,还有一个fd7,返回1表示有1个文件描述符有事件,如果返回-1,表示没有事件
 

  最后是从fd7里面读取到的数据
 

  OS:EPOLL

  查看epoll.2204文件
 

  很明显使用的是epoll,调用了epoll_create
 

  可以看到开辟一个文件描述符4,然后设置了非阻塞
 

  搜9090,一样的看到一个listen的连接绑定到文件描述符4
 

  
 

  目前位置,以上都适合poll是一样的

  往下找,使用了epoll_create,返回的epoll的文件描述符7,在文件描述符7里面添加了5,这个5不用管,它是个管道,应该找那个文件描述符4
 

  继续往下找,才是调用了epoll_ctl,在文件描述符7里面添加了4,就是把用户空间的listen4添加到了内存区域里,然后调用了epoll_wait,返回数量是1,里面是文件描述符4
 

  其实调用epoll_wait等于是代码里的while (selector.select() 500) {调用
 

  再往下就是调用了执行accept,得到的一个文件描述符8
 

  从测试的代码里应该知道,调用了accept后,给它设置了非阻塞
 

  再往下可以知道把文件描述符8添加到了7里面,关注的是EPOLLIN,数据有没有到达,
 

  执行后一定会调用epoll_wait,7里面已经有4和8两个文件描述符了,只是文件描述符8来了数据,有事件返回
 

  总结
 

  
 

  主线程里面在调用selector中比如有FD1、FD2两个文件描述符,在调用了系统调用时候,比如select方法,拿到返回值,也就是哪些是有读写事件的,然后去遍历一个key,比如是FD1,(这个是可以重复调用的),要去处理它的read handler,它会抛出去一个线程,这个线程中去调用FD1的read方法,执行完后会调用FD1.register selector OP_WRITE,关注selector是否有写事件,然后selector就会调用write hander,这个是会重复调用的,只要调用了write hander,它也会抛出线程。
 

  所以现在问题就是在主线程中不能阻塞执行,不能是线性的,所以会重复调起,解决方法就是key.cancel()加到FD1 read的重复调用时候,比如在使用epoll时候key.cancel调用的是内核的epoll_ctl

  我们为啥提出这个模型?
 

  考虑资源利用,充分利用cpu核数
 

  考虑有一个fd执行耗时,在一个线性里会阻塞后续FD的处理
 

  当有N个fd有R/W处理的时候:
 

  将N个FD 分组,每一组一个selector,将一个selector压到一个线程上
 

  最好的线程数量是:cpu cpu*2
 

  其实单看一个线程:里面有一个selector,有一部分FD,且他们是线性的
 

  多个线程,他们在自己的cpu上执行,代表会有多个selector在并行,且线程内是线性的,最终是并行的fd被处理
 

  但是,你得明白,还是一个selector中的fd要放到不同的线程并行,从而造成canel调用嘛? 不需要了!!!

  上边的逻辑其实就是分治,我的程序如果有100W个连接,如果有4个线程(selector),每个线程处理 250000
 

  那么,可不可以拿出一个线程的selector就只关注accpet ,然后把接受的客户端的FD,分配给其他线程的selector

  以上就是6、java API 实战多路复用器()的详细内容,想要了解更多 6、java API 实战多路复用器的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: