concurrentqueue java,linkedqueue队列实现

  concurrentqueue java,linkedqueue队列实现

  ConcurrentLinkedQueue

  JDK为场景提供了一系列并发安全队列。一般来说,根据实现方式的不同,可以分为阻塞队列和非阻塞队列。前者由lock实现,后者由CAS非阻塞算法实现。

  ConcurrentLinkedQueue内部的队列是使用单向链表实现的,其中两个volatile Node节点用于存储队列的头节点和尾节点。从下面的无参数构造函数可以看出,默认的头节点和尾节点是指向空项的sentinel节点。新元素被插入到队列的末尾,并且在出队时从队列的头部获得一个元素。

  public ConcurrentLinkedQueue(){ head=tail=new NodeE(null);}在节点内部,维护一个用volatile修饰的变量项来存储节点的值;Next用于存储链表的下一个节点,从而链接到一个单向无界链表。内部使用了UNSafe工具类提供的CAS算法,保证了进出队列时操作链表的原子性。

  下面介绍ConcurrentLinkedQueue的几种方法来介绍其实现原理。

  offer操作:提供的操作是在队列末尾添加一个元素,如果传递的参数为空,则抛出一个NPE异常。否则,该方法将始终返回true,因为ConcurrentLinkedQueue是一个无限队列。此外,由于使用了CAS非阻塞算法,该方法不会阻塞挂起的调用线程。我们来看看实现原理。

  公共布尔offer(E e) {//(1)e为空。这将抛出一个空指针异常checkNotNull(e);//(2)构造一个Node节点,在构造函数内部调用unsafe . putobject finalnodewnode=new nodee(e);//(3)从尾节点插入for(nodee t=tail,p=t;){ NodeE q=p . next;//(4)如果q==null表示P是尾节点,则插入if (if (q==null) {//p是最后一个节点//(5/(5)设置下一个节点if (p.casNext(null,New)){//成功的CAS是线性化点//对于e成为这个队列的一个元素,//对于New节点成为 live 。//(6)如果CAS成功,则新添加的节点已经放入链表,然后当前的尾节点if (p!=t) //一次跳两个节点casTail(t,new node);//失败是可以的。返回true}//CAS race输给另一个线程;重读next } else if (p==q) //我们已经脱离列表。如果tail不变,它//也将脱离列表,在这种情况下,我们需要//跳到head,从head所有活动节点总是//可到达的。否则新的尾巴是一个更好的赌注。p=(t!=(t=tail))?t :头;否则//在两跳后检查尾部更新。p=(p!=t t!=(t=tail))?t : q;}}先看一个线程调用offer(item)时的情况。首先,代码(1)对传递的参数执行null检查,如果使用null,将抛出NPE异常;否则,将执行代码(2)并使用item作为构造函数参数创建一个新节点,然后代码(3)将从队列的结束节点开始。

  环,打算从队列尾部添加元素。这时候节点p、t、head、tail同时指向了item为null的哨兵节点,由于哨兵节点的next 节点为null,所以这里q也指向null。代码(4)发现q->null则执行代码(5),通过CAS 原子操作判断p节点的next节点是否为null,如果为null 则使用节点newNode替换p的next节点,然后执行代码(6),这里由于p=t所以没有设置尾部节点,然后退出 offer方法。上面是一个线程调用offer方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。假设线程A调用offer(item1),线程B调用 ofer(item2),同时执行到代码(5)p.casNext(null, newNode)。由于CAS的比较设置操作是原子性的,所以这里假设线程A先执行了比较设置操作,发现当前p的 next 节点确实是null,则会原子性地更新next节点为iteml,这时候线程B也会判断p的next节点是否为null,结果发现不是null(因为线程A已经设置了p的next节点为iteml),则会跳到代码(3),然后执行到代码(4)。可见,offer 操作中的关键步骤是代码(5),通过原子CAS 操作来控制某时只有一个线程可以追加元素到队列末尾。进行CAS 竞争失败的线程会通过循环一次次尝试进行 CAS操作,直到CAS 成功才会返回,也就是通过使用无限循环不断进行 CAS 尝试方式来替代阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。

  add操作:

  add操作是在链表尾部添加一个元素,其实在内部调用的还是offer操作。

  

public boolean add(E e) { return offer(e);}

poll操作:

 

  poll操作是在队列头部获取并移除一个元素,如果队列为空则返回null。

  

public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } }}

poll方法在移除一个元素时,只是简单地使用 CAS操作把当前节点的item值设置为null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。

 

  peak操作:

  peak操作是获取队列头部获一个元素,如果队列为空则返回null。

  

public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; //注释 if (item != null (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } }}

Peek操作的代码结构与poll操作类似,不同之处在于我们在代码中标记注释的地方中少了castItem操作。其实这很正常,因为peek只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行offer后head指向的是哨兵节点(也就是item为null的节点),那么第一次执行peek时在注释处会发现item==null,然后执行q=p.next,这时候q节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。

 

  到此这篇关于Java并发编程之ConcurrentLinkedQueue队列详情的文章就介绍到这了,更多相关Java并发编程 ConcurrentLinkedQueue 内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: