7、急速理解Netty模型及IO模型(netty selector模型)

  本篇文章为你整理了7、急速理解Netty模型及IO模型(netty selector模型)的详细内容,包含有netty的nio模型 netty selector模型 netty reactor模型 netty ioratio 7、急速理解Netty模型及IO模型,希望能帮助你了解 7、急速理解Netty模型及IO模型。

  测试demo:

  

public class MainThread {

 

   public static void main(String[] args) {

   // 这里不错io和业务的事情

   // 创建IO Thread 一个或多个

  // SelectorThreadGroup stg = new SelectorThreadGroup(1);

   //混杂模式,只有一个线程负责accept,每个都会被分配client,进行读写

   SelectorThreadGroup boss = new SelectorThreadGroup(3);

   SelectorThreadGroup worker = new SelectorThreadGroup(3);

   // 把监听到的server注册到一个selector

   boss.setWorker(worker);

   // boss里选一个线程注册listen,除法bind,这个不选中的线程持有workerGroup的引用

   // 因为未来listen一旦accept得到client后要去worker中next出一个线程分配

   boss.bind(9999);

   boss.bind(8888);

   boss.bind(7777);

  

 

  

import java.io.IOException;

 

  import java.net.InetSocketAddress;

  import java.nio.channels.Channel;

  import java.nio.channels.ServerSocketChannel;

  import java.util.concurrent.atomic.AtomicInteger;

  public class SelectorThreadGroup {

   SelectorThread[] sts;

   ServerSocketChannel server;

   AtomicInteger xid = new AtomicInteger(0);

   SelectorThreadGroup stg = this;

   public void setWorker(SelectorThreadGroup stg) {

   this.stg = stg;

   public SelectorThreadGroup(int num) {

   sts = new SelectorThread[num];

   for (int i = 0; i num; i++) {

   sts[i] = new SelectorThread(this);

   new Thread(sts[i]).start();

   public void bind(int port) {

   try {

   server = ServerSocketChannel.open();

   server.configureBlocking(false);

   server.bind(new InetSocketAddress(port));

   // 注册到selector上

  // nextSelector(server);

  // nextSelectorV2(server);

   nextSelectorV3(server);

   } catch (IOException e) {

   e.printStackTrace();

   public void nextSelectorV3(Channel c) {

   try {

   if (c instanceof ServerSocketChannel) {

   // listen选择了boss组中的一个线程,要更新线程的worker组

   SelectorThread st = next();

   st.lbq.put(c);

   st.setWorker(stg);

   st.selector.wakeup();

   } else {

   SelectorThread st = nextV3();

   // 通过队列传递数据

   st.lbq.add(c);

   // 通过打断阻塞,让队友线程自己去完成注册selector

   st.selector.wakeup();

   } catch (InterruptedException e) {

   e.printStackTrace();

   public void nextSelectorV2(Channel c) {

   try {

   if (c instanceof ServerSocketChannel) {

   sts[0].lbq.put(c);

   sts[0].selector.wakeup();

   } else {

   SelectorThread st = nextV2();

   // 通过队列传递数据

   st.lbq.add(c);

   // 通过打断阻塞,让队友线程自己去完成注册selector

   st.selector.wakeup();

   } catch (InterruptedException e) {

   e.printStackTrace();

  
public void nextSelector(Channel c) { // 重点:c可能是server也可能是client

   SelectorThread st = next(); // 在main线程中取到SelectorThread对象

   // 通过队列传递数据

   st.lbq.add(c);

   // 通过打断阻塞,让队友线程自己去完成注册selector

   st.selector.wakeup();

   private SelectorThread nextV3() {

   int index = xid.incrementAndGet() % stg.sts.length;

   return stg.sts[index];

   private SelectorThread nextV2() {

   int index = xid.incrementAndGet() % (sts.length - 1);

   return sts[index + 1];

   private SelectorThread next() {

   int index = xid.incrementAndGet() % sts.length;

   return sts[index];

  

 

  

import java.io.IOException;

 

  import java.nio.ByteBuffer;

  import java.nio.channels.*;

  import java.util.Iterator;

  import java.util.Set;

  import java.util.concurrent.LinkedBlockingDeque;

  public class SelectorThread implements Runnable {

   // 每个线程对应一个selector,

   // 多线程情况下,该程序的并发客户端被分配到多个selector上

   // 每个客户端只绑定一个selector上,不会有交互问题

   Selector selector = null;

   LinkedBlockingDeque Channel lbq = new LinkedBlockingDeque ();

   SelectorThreadGroup stg;

   SelectorThread(SelectorThreadGroup selectorThreadGroup) {

   try {

   this.stg = selectorThreadGroup;

   selector = Selector.open();

   } catch (IOException e) {

   e.printStackTrace();

   @Override

   public void run() {

   // loop

   while (true) {

   try {

   System.out.println(Thread.currentThread().getName() + "before select..." + selector.keys().size());

   int nums = selector.select(); // 阻塞返回值一定会大于0

   Thread.sleep(1000);

   System.out.println(Thread.currentThread().getName() + "after select..." + selector.keys().size());

   if (nums 0) {

   Set SelectionKey selectionKeys = selector.selectedKeys();

   Iterator SelectionKey iterator = selectionKeys.iterator();

   while (iterator.hasNext()) {

   SelectionKey key = iterator.next();

   iterator.remove();

   if (key.isAcceptable()) {// 如果要建立连接

   acceptHandler(key);

   } else if (key.isReadable()) {// 如果可读

   readHander(key);

   } else if (key.isWritable()) {// 如果可写

   // 处理一些task

   if (!lbq.isEmpty()) { // 堆里的对象,线程的栈是独立的,堆是共享的

   Channel c = lbq.take();

   // 判断类型

   if (c instanceof ServerSocketChannel) {

   ServerSocketChannel serverSocketChannel = (ServerSocketChannel) c;

   serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

   System.out.println(Thread.currentThread().getName() + " register listen");

   } else if (c instanceof SocketChannel) {

   SocketChannel client = (SocketChannel) c;

   ByteBuffer buffer = ByteBuffer.allocateDirect(4096);

   client.register(selector, SelectionKey.OP_READ, buffer);

   System.out.println(Thread.currentThread().getName() + " register client " + client.getRemoteAddress());

   } catch (Exception e) {

   e.printStackTrace();

   private void readHander(SelectionKey key) {

   System.out.println(Thread.currentThread().getName() + " readHander...");

   ByteBuffer buffer = (ByteBuffer) key.attachment();

   SocketChannel client = (SocketChannel) key.channel();

   buffer.clear();

   while (true) {

   try {

   int num = client.read(buffer);

   if (num 0) {

   buffer.flip(); // 将读到的内容翻转,直接写出

   while (buffer.hasRemaining()) {

   client.write(buffer);

   buffer.clear();

   } else if (num == 0) {

   break;

   } else if (num 0) {// 客户端断开

   System.out.println("client: " + client.getRemoteAddress() + "closed...");

   key.channel();

   break;

   } catch (IOException e) {

   e.printStackTrace();

   private void acceptHandler(SelectionKey key) {

   System.out.println(Thread.currentThread().getName() + " acceptHandler......");

   ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

   try {

   SocketChannel client = serverSocketChannel.accept();

   client.configureBlocking(false); // 非阻塞

   // 选择多路复用器并且注册

  // stg.nextSelector(client);

  // stg.nextSelectorV2(client);

   stg.nextSelectorV3(client);

   } catch (IOException e) {

   e.printStackTrace();

   public void setWorker(SelectorThreadGroup stgWorker) {

   this.stg = stgWorker;

  

 

  以上就是7、急速理解Netty模型及IO模型(netty selector模型)的详细内容,想要了解更多 7、急速理解Netty模型及IO模型的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: