Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析(java阻塞队列和非阻塞队列)

  本篇文章为你整理了Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析(java阻塞队列和非阻塞队列)的详细内容,包含有java阻塞队列线程安全吗 java阻塞队列和非阻塞队列 java阻塞队列和非阻塞队列的区别 java阻塞和非阻塞队列 Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析,希望能帮助你了解 Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析。

   上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。

  但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0。

  
上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。

  但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0。

  作用就是一个线程往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据。

  这样特殊的队列,有什么应用场景呢?

  1. SynchronousQueue用法

  先看一个SynchronousQueue的简单用例:

  

/**

 

   * @author 一灯架构

   * @apiNote SynchronousQueue示例

  public class SynchronousQueueDemo {

   public static void main(String[] args) throws InterruptedException {

   // 1. 创建SynchronousQueue队列

   BlockingQueue Integer synchronousQueue = new SynchronousQueue ();

   // 2. 启动一个线程,往队列中放3个元素

   new Thread(() - {

   try {

   System.out.println(Thread.currentThread().getName() + " 入队列 1");

   synchronousQueue.put(1);

   Thread.sleep(1);

   System.out.println(Thread.currentThread().getName() + " 入队列 2");

   synchronousQueue.put(2);

   Thread.sleep(1);

   System.out.println(Thread.currentThread().getName() + " 入队列 3");

   synchronousQueue.put(3);

   } catch (InterruptedException e) {

   e.printStackTrace();

   }).start();

   // 3. 等待1000毫秒

   Thread.sleep(1000L);

   // 4. 再启动一个线程,从队列中取出3个元素

   new Thread(() - {

   try {

   System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());

   Thread.sleep(1);

   System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());

   Thread.sleep(1);

   System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());

   } catch (InterruptedException e) {

   e.printStackTrace();

   }).start();

  

 

  输出结果:

  

Thread-0 入队列 1

 

  Thread-1 出队列 1

  Thread-0 入队列 2

  Thread-1 出队列 2

  Thread-0 入队列 3

  Thread-1 出队列 3

  

 

  从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。

  由于SynchronousQueue是BlockingQueue的实现类,所以也实现类BlockingQueue中几组抽象方法:

  为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。

  
当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。

  当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。

  当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

  工作中使用最多的就是offer、poll阻塞指定时间的方法。

  2. SynchronousQueue应用场景

  SynchronousQueue的特点:

  队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。

  这种特殊的实现逻辑有什么应用场景呢?

  我的理解就是,如果你希望你的任务需要被快速处理,就可以使用这种队列。

  Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

  

public static ExecutorService newCachedThreadPool() {

 

   return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

   60L, TimeUnit.SECONDS,

   new SynchronousQueue Runnable

  

 

  newCachedThreadPool线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒。

  如果你使用newCachedThreadPool线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务。

  你想想,这处理效率,杠杠滴!

  当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。

  3. SynchronousQueue源码解析

  3.1 SynchronousQueue类属性

  

public class SynchronousQueue E extends AbstractQueue E implements BlockingQueue E {

 

   // 转换器,取数据和放数据的核心逻辑都在这个类里面

   private transient volatile Transferer E transferer;

   // 默认的构造方法(使用非公平队列)

   public SynchronousQueue() {

   this(false);

   // 有参构造方法,可以指定是否使用公平队列

   public SynchronousQueue(boolean fair) {

   transferer = fair ? new TransferQueue E () : new TransferStack E

   // 转换器实现类

   abstract static class Transferer E {

   abstract E transfer(E e, boolean timed, long nanos);

   // 基于栈实现的非公平队列

   static final class TransferStack E extends Transferer E {

   // 基于队列实现的公平队列

   static final class TransferQueue E extends Transferer E {

  

 

  可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列。

  

// 使用非公平队列(基于栈实现)

 

  BlockingQueue Integer synchronousQueue = new SynchronousQueue ();

  // 使用公平队列(基于队列实现)

  BlockingQueue Integer synchronousQueue = new SynchronousQueue (true);

  

 

  本次就常用的栈实现来剖析SynchronousQueue的底层实现原理。

  3.2 栈底层结构

  栈结构,是非公平的,遵循先进后出。

  使用个case测试一下:

  

/**

 

   * @author 一灯架构

   * @apiNote SynchronousQueue示例

  public class SynchronousQueueDemo {

   public static void main(String[] args) throws InterruptedException {

   // 1. 创建SynchronousQueue队列

   SynchronousQueue Integer synchronousQueue = new SynchronousQueue ();

   // 2. 启动一个线程,往队列中放1个元素

   new Thread(() - {

   try {

   System.out.println(Thread.currentThread().getName() + " 入队列 0");

   synchronousQueue.put(0);

   } catch (InterruptedException e) {

   e.printStackTrace();

   }).start();

   // 3. 等待1000毫秒

   Thread.sleep(1000L);

   // 4. 启动一个线程,往队列中放1个元素

   new Thread(() - {

   try {

   System.out.println(Thread.currentThread().getName() + " 入队列 1");

   synchronousQueue.put(1);

   } catch (InterruptedException e) {

   e.printStackTrace();

   }).start();

   // 5. 等待1000毫秒

   Thread.sleep(1000L);

   // 6. 再启动一个线程,从队列中取出1个元素

   new Thread(() - {

   try {

   System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());

   } catch (InterruptedException e) {

   e.printStackTrace();

   }).start();

   // 7. 等待1000毫秒

   Thread.sleep(1000L);

   // 8. 再启动一个线程,从队列中取出1个元素

   new Thread(() - {

   try {

   System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());

   } catch (InterruptedException e) {

   e.printStackTrace();

   }).start();

  

 

  输出结果:

  

Thread-0 入队列 0

 

  Thread-1 入队列 1

  Thread-2 出队列 1

  Thread-3 出队列 0

  

 

  从输出结果中可以看出,符合栈结构先进后出的顺序。

  3.3 栈节点源码

  栈中的数据都是由一个个的节点组成的,先看一下节点类的源码:

  

// 节点

 

  static final class SNode {

   // 节点值(取数据的时候,该字段为null)

   Object item;

   // 存取数据的线程

   volatile Thread waiter;

   // 节点模式

   int mode;

   // 匹配到的节点

   volatile SNode match;

   // 后继节点

   volatile SNode next;

  

 

  
3.4 put/take流程

  放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的。

  先看一下数据流转的过程,方便理解源码。

  还是以上面的case为例:

  Thread0先往SynchronousQueue队列中放入元素0

  Thread1再往SynchronousQueue队列放入元素1

  Thread2从SynchronousQueue队列中取出一个元素

  第一步:Thread0先往SynchronousQueue队列中放入元素0

  把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据。

  第二步:Thread1再往SynchronousQueue队列放入元素1

  把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0。

  第三步:Thread2从SynchronousQueue队列中取出一个元素

  这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶。

  item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1。

  然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点。

  3.5 put/take源码实现

  看完 了put/take流程,再来看源码就简单多了。

  先看一下put方法源码:

  

// 放数据

 

  public void put(E e) throws InterruptedException {

   // 不允许放null元素

   if (e == null)

   throw new NullPointerException();

   // 调用转换器实现类,放元素

   if (transferer.transfer(e, false, 0) == null) {

   // 如果放数据失败,就中断当前线程,并抛出异常

   Thread.interrupted();

   throw new InterruptedException();

  

 

  核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解。

  

// 取数据和放数据操作,共用一个方法

 

  E transfer(E e, boolean timed, long nanos) {

   SNode s = null;

   // e为空,说明是取数据,否则是放数据

   int mode = (e == null) ? REQUEST : DATA;

   for (; ; ) {

   SNode h = head;

   // 1. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据)

   if (h == null h.mode == mode) {

   // 2. 判断节点是否已经超时

   if (timed nanos = 0) {

   // 3. 如果栈顶节点已经被取消,就删除栈顶节点

   if (h != null h.isCancelled())

   casHead(h, h.next);

   else

   return null;

   // 4. 把本次操作包装成SNode,压入栈顶

   } else if (casHead(h, s = snode(s, e, h, mode))) {

   // 5. 挂起当前线程,等待被唤醒

   SNode m = awaitFulfill(s, timed, nanos);

   // 6. 如果这个节点已经被取消,就删除这个节点

   if (m == s) {

   clean(s);

   return null;

   // 7. 把s.next设置成head

   if ((h = head) != null h.next == s)

   casHead(h, s.next);

   return (E) ((mode == REQUEST) ? m.item : s.item);

   // 8. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型

   } else if (!isFulfilling(h.mode)) {

   // 9. 再次判断如果栈顶节点已经被取消,就删除栈顶节点

   if (h.isCancelled())

   casHead(h, h.next);

   // 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶

   else if (casHead(h, s = snode(s, e, h, FULFILLING mode))) {

   // 11. 使用死循环,直到匹配到对应的节点

   for (; ; ) {

   // 12. 遍历下个节点

   SNode m = s.next;

   // 13. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。

   if (m == null) {

   casHead(s, null);

   s = null;

   break;

   SNode mn = m.next;

   // 14. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。

   if (m.tryMatch(s)) {

   casHead(s, mn);

   return (E) ((mode == REQUEST) ? m.item : s.item);

   } else

   // 15. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配

   s.casNext(m, mn);

   } else {

   // 16. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型,

   // 就再执行一遍上面第11步for循环中的逻辑(很少概率出现)

   SNode m = h.next;

   if (m == null)

   casHead(h, null);

   else {

   SNode mn = m.next;

   if (m.tryMatch(h))

   casHead(h, mn);

   else

   h.casNext(m, mn);

  

 

  transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点。

  transfer方法中调用了awaitFulfill方法,作用是挂起当前线程。

  

// 等待被唤醒

 

  SNode awaitFulfill(SNode s, boolean timed, long nanos) {

   // 1. 计算超时时间

   final long deadline = timed ? System.nanoTime() + nanos : 0L;

   Thread w = Thread.currentThread();

   // 2. 计算自旋次数

   int spins = (shouldSpin(s) ?

   (timed ? maxTimedSpins : maxUntimedSpins) : 0);

   for (;;) {

   if (w.isInterrupted())

   s.tryCancel();

   // 3. 如果已经匹配到其他节点,直接返回

   SNode m = s.match;

   if (m != null)

   return m;

   if (timed) {

   // 4. 超时时间递减

   nanos = deadline - System.nanoTime();

   if (nanos = 0L) {

   s.tryCancel();

   continue;

   // 5. 自旋次数减一

   if (spins 0)

   spins = shouldSpin(s) ? (spins-1) : 0;

   else if (s.waiter == null)

   s.waiter = w;

   // 6. 开始挂起当前线程

   else if (!timed)

   LockSupport.park(this);

   else if (nanos spinForTimeoutThreshold)

   LockSupport.parkNanos(this, nanos);

  

 

  awaitFulfill方法的逻辑也很简单,就是挂起当前线程。

  take方法底层使用的也是transfer方法:

  

// 取数据

 

  public E take() throws InterruptedException {

   // // 调用转换器实现类,取数据

   E e = transferer.transfer(null, false, 0);

   if (e != null)

   return e;

   // 没取到,就中断当前线程

   Thread.interrupted();

   throw new InterruptedException();

  

 

  4. 总结

  SynchronousQueue是一种特殊的阻塞队列,队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。

  SynchronousQueue底层是基于栈和队列两种数据结构实现的。

  Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

  如果希望你的任务需要被快速处理,可以使用SynchronousQueue队列。

  
我是「一灯架构」,如果本文对你有帮助,欢迎各位小伙伴点赞、评论和关注,感谢各位老铁,我们下期见

  以上就是Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析(java阻塞队列和非阻塞队列)的详细内容,想要了解更多 Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: