SynchronousQueue详解(synchronizedqueue)

  本篇文章为你整理了SynchronousQueue详解(synchronizedqueue)的详细内容,包含有synchronizequeue synchronizedqueue synchronized详解 synchronous interaction SynchronousQueue详解,希望能帮助你了解 SynchronousQueue详解。

  SynchronousQueue介绍

   【1】SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。

  

   【2】如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

   【3】需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

  SynchronousQueue的源码分析

   【1】构造函数

  

//默认采用非公平

 

  public SynchronousQueue() {

   this(false);

  //可以选择模式

  public SynchronousQueue(boolean fair) {

   transferer = fair ? new TransferQueue E () : new TransferStack E ();

  }

 

   【2】核心方法分析

  

//这些方法本质上都是调用属性值transferer的transfer方法

 

  public void put(E e) throws InterruptedException {

   if (e == null) throw new NullPointerException();

   if (transferer.transfer(e, false, 0) == null) {

   Thread.interrupted();

   throw new InterruptedException();

  public boolean offer(E e) {

   if (e == null) throw new NullPointerException();

   return transferer.transfer(e, true, 0) != null;

  public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

   if (e == null) throw new NullPointerException();

   if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)

   return true;

   if (!Thread.interrupted())

   return false;

   throw new InterruptedException();

  public E take() throws InterruptedException {

   E e = transferer.transfer(null, false, 0);

   if (e != null)

   return e;

   Thread.interrupted();

   throw new InterruptedException();

  public E poll(long timeout, TimeUnit unit) throws InterruptedException {

   E e = transferer.transfer(null, true, unit.toNanos(timeout));

   if (e != null !Thread.interrupted())

   return e;

   throw new InterruptedException();

  public E poll() {

   return transferer.transfer(null, true, 0);

  }

 

  

  Transferer分析

   【1】Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。

   【2】Transferer有两个实现类:TransferQueue和TransferStack。

   【3】这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。

   【4】源码展示

  

// 堆栈和队列共同的接口,负责执行 put or take

 

  abstract static class Transferer E {

   // e 为空的,会直接返回特殊值,不为空会传递给消费者

   // timed 为 true,说明会有超时时间

   abstract E transfer(E e, boolean timed, long nanos);

  }

 

  

  TransferQueue分析

   【1】节点元素

  

//队列节点元素

 

  static final class QNode {

   // 当前元素的下一个元素

   volatile QNode next;

   // 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面

   volatile Object item;

   // 可以阻塞住的当前线程

   volatile Thread waiter;

   // 节点类型:true是 put,false是 take

   final boolean isData;

   ....

  }

 

   【2】构造方法

  

//队列头结点指针

 

  transient volatile QNode head;

  //队列尾结点指针

  transient volatile QNode tail;

  TransferQueue() {

   QNode h = new QNode(null, false); // initialize to dummy node.

   head = h;

   tail = h;

  }

 

   【3】核心方法

  

@SuppressWarnings("unchecked")

 

  E transfer(E e, boolean timed, long nanos) {

   QNode s = null;

   //根据是否传入数据 判断是获取还是存放

   boolean isData = (e != null);

   for (;;) {

   // 队列头和尾的临时变量,队列是空的时候,t=h

   QNode t = tail;

   QNode h = head;

   // tail 和 head 没有初始化时,无限循环,虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况

   // 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了

   if (t == null h == null) // saw uninitialized value

   continue; // spin

   // 首尾节点相同,说明是空队列

   // 或者尾节点的操作和当前节点操作一致

   if (h == t t.isData == isData) { // empty or same-mode

   QNode tn = t.next;

   if (t != tail) //直至拿到尾节点

   continue;

   if (tn != null) { // lagging tail

   advanceTail(t, tn);

   continue;

   //超时直接返回 null

   if (timed nanos = 0) // cant wait

   return null;

   //构建新节点

   if (s == null)

   s = new QNode(e, isData);

   //将新建节点塞入队列

   if (!t.casNext(null, s)) // failed to link in

   continue;

   advanceTail(t, s);

   // 阻塞住自己

   Object x = awaitFulfill(s, e, timed, nanos);

   if (x == s) { // wait was cancelled

   clean(t, s);

   return null;

   if (!s.isOffList()) { // not already unlinked

   advanceHead(t, s); // unlink if head

   if (x != null) // and forget fields

   s.item = s;

   s.waiter = null;

   return (x != null) ? (E)x : e;

   // 队列不为空,并且当前操作和队尾不一致,也就是说当前操作是队尾是对应的操作

   // 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put

   else {

   // 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作

   QNode m = h.next;

   if (t != tail m == null h != head)

   continue; // inconsistent read

   Object x = m.item;

   if (isData == (x != null) // m already fulfilled

   x == m // m cancelled

   !m.casItem(x, e)) { // lost CAS

   advanceHead(h, m); // dequeue and retry

   continue;

   // 当前操作放到队头

   advanceHead(h, m);

   // 释放队头阻塞节点

   LockSupport.unpark(m.waiter);

   return (x != null) ? (E)x : e;

  }

 

  

  TransferStack分析

   【1】节点元素

  

// 栈中节点的几种类型:

 

  // 1. 消费者(请求数据的)

  static final int REQUEST = 0;

  // 2. 生产者(提供数据的)

  static final int DATA = 1;

  // 3. 二者正在匹配中

  static final int FULFILLING = 2;

  // 栈中的节点

  static final class SNode {

   // 下一个节点

   volatile SNode next;

   volatile SNode match; // the node matched to this

   // 等待着的线程

   volatile Thread waiter;

   Object item;

   // 模式,也就是节点的类型,是消费者,是生产者,还是正在匹配中

   int mode;

  }

 

  

   【2】核心方法

  

// TransferStack.transfer()方法

 

  E transfer(E e, boolean timed, long nanos) {

   SNode s = null; // constructed/reused as needed

   // 根据e是否为null决定是生产者还是消费者

   int mode = (e == null) ? REQUEST : DATA;

   // 自旋+CAS

   for (;;) {

   // 栈顶元素

   SNode h = head;

   // 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的

   // 也就是都是生产者节点或者都是消费者节点

   if (h == null h.mode == mode) { // empty or same-mode

   // 如果有超时而且已到期

   if (timed nanos = 0) { // cant wait

   // 如果头节点不为空且是取消状态

   if (h != null h.isCancelled())

   // 就把头节点弹出,并进入下一次循环

   casHead(h, h.next); // pop cancelled node

   else

   // 否则,直接返回null(超时返回null)

   return null;

   } else if (casHead(h, s = snode(s, e, h, mode))) {

   // 入栈成功(因为是模式相同的,所以只能入栈)

   // 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到

   SNode m = awaitFulfill(s, timed, nanos);

   // 如果m等于s,说明取消了,那么就把它清除掉,并返回null

   if (m == s) { // wait was cancelled

   clean(s);

   // 被取消了返回null

   return null;

   // 到这里说明匹配到元素了

   // 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了

   // 如果头节点不为空,并且头节点的下一个节点是s

   // 就把头节点换成s的下一个节点

   // 也就是把h和s都弹出了

   // 也就是把栈顶两个元素都弹出了

   if ((h = head) != null h.next == s)

   casHead(h, s.next); // help ss fulfiller

   // 根据当前节点的模式判断返回m还是s中的值

   return (E) ((mode == REQUEST) ? m.item : s.item);

   } else if (!isFulfilling(h.mode)) { // try to fulfill

   // 到这里说明头节点和当前节点模式不一样

   // 如果头节点不是正在匹配中

   // 如果头节点已经取消了,就把它弹出栈

   if (h.isCancelled()) // already cancelled

   casHead(h, h.next); // pop and retry

   else if (casHead(h, s=snode(s, e, h, FULFILLINGmode))) {

   // 头节点没有在匹配中,就让当前节点先入队,再让他们尝试匹配

   // 且s成为了新的头节点,它的状态是正在匹配中

   for (;;) { // loop until matched or waiters disappear

   SNode m = s.next; // m is ss match

   // 如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了

   // 就清空栈并跳出内部循环,到外部循环再重新入栈判断

   if (m == null) { // all waiters are gone

   casHead(s, null); // pop fulfill node

   s = null; // use new node next time

   break; // restart main loop

   SNode mn = m.next;

   // 如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s

   if (m.tryMatch(s)) {

   casHead(s, mn); // pop both s and m

   // 返回匹配结果

   return (E) ((mode == REQUEST) ? m.item : s.item);

   } else // lost match

   // 尝试匹配失败,说明m已经先一步被其它线程匹配了

   // 就协助清除它

   s.casNext(m, mn); // help unlink

   } else { // help a fulfiller

   // 到这里说明当前节点和头节点模式不一样

   // 且头节点是正在匹配中

   SNode m = h.next; // m is hs match

   if (m == null) // waiter is gone

   // 如果m为null,说明m已经被其它线程先一步匹配了

   casHead(h, null); // pop fulfilling node

   else {

   SNode mn = m.next;

   // 协助匹配,如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s

   if (m.tryMatch(h)) // help match

   // 将栈顶的两个元素弹出后,再让s重新入栈

   casHead(h, mn); // pop both h and m

   else // lost match

   // 尝试匹配失败,说明m已经先一步被其它线程匹配了

   // 就协助清除它

   h.casNext(m, mn); // help unlink

  // 三个参数:需要等待的节点,是否需要超时,超时时间

  SNode awaitFulfill(SNode s, boolean timed, long nanos) {

   // 到期时间

   final long deadline = timed ? System.nanoTime() + nanos : 0L;

   // 当前线程

   Thread w = Thread.currentThread();

   // 自旋次数

   int spins = (shouldSpin(s) ?

   (timed ? maxTimedSpins : maxUntimedSpins) : 0);

   for (;;) {

   // 当前线程中断了,尝试清除s

   if (w.isInterrupted())

   s.tryCancel();

   // 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s)

   SNode m = s.match;

   // 如果匹配到了,直接返回m

   if (m != null)

   return m;

   // 如果需要超时

   if (timed) {

   // 检查超时时间如果小于0了,尝试清除s

   nanos = deadline - System.nanoTime();

   if (nanos = 0L) {

   s.tryCancel();

   continue;

   if (spins 0)

   // 如果还有自旋次数,自旋次数减一,并进入下一次自旋

   spins = shouldSpin(s) ? (spins-1) : 0;

   // 后面的elseif都是自旋次数没有了

   else if (s.waiter == null)

   // 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋

   s.waiter = w; // establish waiter so can park next iter

   else if (!timed)

   // 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素

   LockSupport.park(this);

   else if (nanos spinForTimeoutThreshold)

   // 如果允许超时且还有剩余时间,就阻塞相应时间

   LockSupport.parkNanos(this, nanos);

  // SNode里面的方向,调用者m是s的下一个节点

  // 这时候m节点的线程应该是阻塞状态的

  boolean tryMatch(SNode s) {

   // 如果m还没有匹配者,就把s作为它的匹配者

   if (match == null

   UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {

   Thread w = waiter;

   if (w != null) { // waiters need at most one unpark

   waiter = null;

   // 唤醒m中的线程,两者匹配完毕

   LockSupport.unpark(w);

   // 匹配到了返回true

   return true;

   // 可能其它线程先一步匹配了m,返回其是否是s

   return match == s;

  }

 

  

  SynchronousQueue总结

   【1】是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。

   【2】数据结构:链表,在其内部类中维护了数据

   先消费(take),后生产(put);
 

   第一个线程Thread0是消费者访问,此时队列为空,则入队(创建Node结点并赋值)

   第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队

   第三个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作。直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队。

   反之,先生产(put)后消费(take),原理一样

   【3】锁:CAS+自旋(无锁)【阻塞:自旋了一定次数后调用 LockSupport.park()】

   【4】存取调用同一个方法:transfer()

   put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中。

   take、poll 为消费者,不携帯数据,为 Request 模式,设置到 SNode或QNode属性中。

   【5】过程

   线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同

   相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性

   不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队

   【6】公平模式:TransferQueue,队尾匹配(判断模式),队头出队,先进先出

   【7】非公平模式(默认策略):TransferStack,栈顶匹配,栈顶出栈,后进先出

   【8】应用场景

   SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。

   SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

  

 

 

  以上就是SynchronousQueue详解(synchronizedqueue)的详细内容,想要了解更多 SynchronousQueue详解的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: