LinkedBlockingQueue详解(linkedblockingqueue.poll())

  本篇文章为你整理了LinkedBlockingQueue详解(linkedblockingqueue.poll())的详细内容,包含有linkedblockedqueue linkedblockingqueue.poll() linkedblockingdeque linkedblockingqueue和linkedblockingdeque LinkedBlockingQueue详解,希望能帮助你了解 LinkedBlockingQueue详解。

  LinkedBlockingQueue介绍

   【1】LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

   【2】LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

  

  LinkedBlockingQueue使用

  

//指定队列的大小创建有界队列

 

  BlockingQueue Integer boundedQueue = new LinkedBlockingQueue (100);

  //无界队列

  BlockingQueue Integer unboundedQueue = new LinkedBlockingQueue ();

 

  

  LinkedBlockingQueue的源码分析

   【1】属性值

  

// 容量,指定容量就是有界队列

 

  private final int capacity;

  // 元素数量,用原子操作类的原因在于有两个线程都会操作需要保证可见性

  private final AtomicInteger count = new AtomicInteger();

  // 链表头 本身是不存储任何元素的,初始化时item指向null

  transient Node E head;

  // 链表尾

  private transient Node E last;

  // take锁 锁分离,提高效率

  private final ReentrantLock takeLock = new ReentrantLock();

  // notEmpty条件

  // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒

  private final Condition notEmpty = takeLock.newCondition();

  // put锁

  private final ReentrantLock putLock = new ReentrantLock();

  // notFull条件

  // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒

  private final Condition notFull = putLock.newCondition();

  //典型的单链表结构

  static class Node E {

   E item; //存储元素

   Node E next; //后继节点 单链表结构

   Node(E x) { item = x; }

  }

 

  

   【2】构造函数

  

public LinkedBlockingQueue() {

 

   // 如果没传容量,就使用最大int值初始化其容量

   this(Integer.MAX_VALUE);

  public LinkedBlockingQueue(int capacity) {

   if (capacity = 0) throw new IllegalArgumentException();

   this.capacity = capacity;

   // 初始化head和last指针为空值节点

   last = head = new Node E (null);

  public LinkedBlockingQueue(Collection ? extends E c) {

   this(Integer.MAX_VALUE);

   final ReentrantLock putLock = this.putLock;

   putLock.lock(); // 为保证可见性而加的锁

   try {

   int n = 0;

   for (E e : c) {

   if (e == null)

   throw new NullPointerException();

   if (n == capacity)

   throw new IllegalStateException("Queue full");

   enqueue(new Node E (e));

   ++n;

   count.set(n);

   } finally {

   putLock.unlock();

  }

 

  

   【3】核心方法分析

   1)入队put方法

  

public void put(E e) throws InterruptedException { 

 

   // 不允许null元素

   if (e == null) throw new NullPointerException();

   int c = -1;

   // 新建一个节点

   Node E node = new Node E (e);

   final ReentrantLock putLock = this.putLock;

   final AtomicInteger count = this.count;

   // 使用put锁加锁

   putLock.lockInterruptibly();

   try {

   // 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)

   while (count.get() == capacity) {

   notFull.await();

   // 队列不满,就入队

   enqueue(node);

   c = count.getAndIncrement();// 队列长度加1,返回原值

   // 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)

   // 这里为啥要唤醒一下呢?因为存在情况是,没人获取时,队列满了而且还不断有人塞数据,此时会一大批线程被阻塞,现在有空余位置了,应该被唤醒

   if (c + 1 capacity)

   notFull.signal();

   } finally {

   putLock.unlock(); // 真正唤醒生产者线程

   // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程

   if (c == 0)

   signalNotEmpty();

  private void enqueue(Node E node) {

   // 直接加到last后面,last指向入队元素

   last = last.next = node;

  private void signalNotEmpty() {

   final ReentrantLock takeLock = this.takeLock;

   takeLock.lock();// 加take锁

   try {

   notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程

   } finally {

   takeLock.unlock(); // 真正唤醒消费者线程

  }

 

  

   2)出队take方法

  

public E take() throws InterruptedException {

 

   E x;

   int c = -1;

   final AtomicInteger count = this.count;

   final ReentrantLock takeLock = this.takeLock;

   // 使用takeLock加锁

   takeLock.lockInterruptibly();

   try {

   // 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)

   while (count.get() == 0) {

   notEmpty.await();

   // 否则,出队

   x = dequeue();

   c = count.getAndDecrement();//长度-1,返回原值

   if (c 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理

   notEmpty.signal();

   } finally {

   takeLock.unlock(); // 真正唤醒消费者线程

   // 为什么队列是满的才唤醒阻塞在notFull上的线程呢?

   // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,

   // 这也是锁分离带来的代价

   // 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程

   if (c == capacity)

   signalNotFull();

   return x;

  private E dequeue() {

   // head节点本身是不存储任何元素的

   // 这里把head删除,并把head下一个节点作为新的值

   // 并把其值置空,返回原来的值

   Node E h = head;

   Node E first = h.next;

   h.next = h; // 方便GC

   head = first;

   E x = first.item;

   first.item = null;

   return x;

  private void signalNotFull() {

   final ReentrantLock putLock = this.putLock;

   putLock.lock();

   try {

   notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程

   } finally {

   putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程

  }

 

  

  LinkedBlockingQueue总结

   【1】无界阻塞队列,可以指定容量,默认为 Integer.MAX_VALUE,先进先出,存取互不干扰

   【2】数据结构:链表(可以指定容量,默认为Integer.MAX_VALUE,内部类Node存储元素)

   【3】锁分离:存取互不干扰,存取操作的是不同的Node对象(takeLock【取Node节点保证前驱后继不乱】,putLock【存Node节点保证前驱后继不乱】,删除时则两个锁一起加)【这是最大的亮点】

   【4】阻塞对象(notEmpty【出队:队列count=0,无元素可取时,阻塞在该对象上】,notFull【入队:队列count=capacity,放不进元素时,阻塞在该对象上】)
 

   【5】入队,从队尾入队,由last指针记录。
 

   【6】出队,从队首出队,由head指针记录。

   【7】线程池中采用LinkedBlockingQueue而不采用ArrayBlockingQueue的原因便是因为锁分离带来了性能的提升,大大提高队列的吞吐量。

  以上就是LinkedBlockingQueue详解(linkedblockingqueue.poll())的详细内容,想要了解更多 LinkedBlockingQueue详解的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: