ArrayBlockingQueue详解(arrayblockingqueue和linkedblockingqueue)

  本篇文章为你整理了ArrayBlockingQueue详解(arrayblockingqueue和linkedblockingqueue)的详细内容,包含有arrayblockingqueue和linked使用场景 arrayblockingqueue和linkedblockingqueue array block arrayblockingqueue是线程安全的吗 ArrayBlockingQueue详解,希望能帮助你了解 ArrayBlockingQueue详解。

  ArrayBlockingQueue介绍

   ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。

   在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。

   使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

  

  ArrayBlockingQueue的源码分析

   【1】属性值

  

/** 队列元素数组 */

 

  final Object[] items;

  /** 下一个被take,poll,peek,remove的元素位置 */

  int takeIndex;

  /** 插入位置包含put,offer,add */

  int putIndex;

  /** 队列元素的数量 */

  int count;

  /** 重入锁 */

  final ReentrantLock lock;

  /** 等待获取的条件队列 */

  private final Condition notEmpty;

  /** 等待插入的条件队列 */

  private final Condition notFull;

  //迭代器的共享状态

  transient Itrs itrs = null;

 

  

   【2】构造函数

  

//默认采用非公平锁

 

  public ArrayBlockingQueue(int capacity) {

   this(capacity, false);

  
public ArrayBlockingQueue(int capacity, boolean fair,Collection ? extends E c) {

   //初始化阻塞队列

   this(capacity, fair);

   //加锁将数组元素填入阻塞队列(主要是考虑到重排序和可见性问题,因为Object[] items 并没有加上 volatile 属性)

   final ReentrantLock lock = this.lock;

   lock.lock(); // Lock only for visibility, not mutual exclusion

   try {

   int i = 0;

   try {

   for (E e : c) {

   checkNotNull(e);

   items[i++] = e;

   } catch (ArrayIndexOutOfBoundsException ex) {

   throw new IllegalArgumentException();

   count = i;

   //将插入位置下变更

   putIndex = (i == capacity) ? 0 : i;

   } finally {

   lock.unlock();

  }

 

  

   【3】核心方法分析

   1)入队put方法

  

public void put(E e) throws InterruptedException {

 

   //检查是否为空

   checkNotNull(e);

   final ReentrantLock lock = this.lock;

   //加锁,如果线程中断抛出异常

   lock.lockInterruptibly();

   try {

   //阻塞队列已满,则将生产者挂起,等待消费者唤醒

   //设计注意点: 用while不用if是为了防止虚假唤醒

   while (count == items.length)

   notFull.await(); //队列满了,使用notFull等待(生产者阻塞)

   // 入队

   enqueue(e);

   } finally {

   lock.unlock(); // 唤醒消费者线程

  private void enqueue(E x) {

   final Object[] items = this.items;

   //入队 使用的putIndex

   items[putIndex] = x;

   if (++putIndex == items.length)

   putIndex = 0; //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部

   count++;

   //notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了

   notEmpty.signal();

  }

 

  

   2)出队take方法

  

public E take() throws InterruptedException {

 

   final ReentrantLock lock = this.lock;

   //加锁,如果线程中断抛出异常

   lock.lockInterruptibly();

   try {

   //如果队列为空,则消费者挂起

   while (count == 0)

   notEmpty.await();

   //出队

   return dequeue();

   } finally {

   lock.unlock();// 唤醒生产者线程

  private E dequeue() {

   final Object[] items = this.items;

   @SuppressWarnings("unchecked")

   E x = (E) items[takeIndex]; //取出takeIndex位置的元素

   items[takeIndex] = null;

   if (++takeIndex == items.length)

   takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部

   count--;

   if (itrs != null)

   itrs.elementDequeued();

   //notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位

   notFull.signal();

   return x;

  }

 

  

   3)其余offer poll peek remove方法

  

public boolean offer(E e) {

 

   checkNotNull(e);

   final ReentrantLock lock = this.lock;

   lock.lock();

   try {

   if (count == items.length)

   return false;

   else {

   enqueue(e);

   return true;

   } finally {

   lock.unlock();

  //本质区别在于设置了超时时间,超时后选择不加入,返回false

  public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

   checkNotNull(e);

   long nanos = unit.toNanos(timeout);

   final ReentrantLock lock = this.lock;

   lock.lockInterruptibly();

   try {

   while (count == items.length) {

   if (nanos = 0)

   return false;

   //生产线程堵塞nanos时间,也有可能被唤醒,如果超过nanos时间还未被唤醒,则nanos=0,再次循环,就会返回false

   nanos = notFull.awaitNanos(nanos);

   enqueue(e);

   return true;

   } finally {

   lock.unlock();

  public E poll() {

   final ReentrantLock lock = this.lock;

   lock.lock();

   try {

   return (count == 0) ? null : dequeue();

   } finally {

   lock.unlock();

  //本质区别在于设置了超时时间,超时后选择不获取,返回null

  public E poll(long timeout, TimeUnit unit) throws InterruptedException {

   long nanos = unit.toNanos(timeout);

   final ReentrantLock lock = this.lock;

   lock.lockInterruptibly();

   try {

   while (count == 0) {

   if (nanos = 0)

   return null;

   nanos = notEmpty.awaitNanos(nanos);

   return dequeue();

   } finally {

   lock.unlock();

  public E peek() {

   final ReentrantLock lock = this.lock;

   lock.lock();

   try {

   //通过下标查找直接返回

   return itemAt(takeIndex); // null when queue is empty

   } finally {

   lock.unlock();

  final E itemAt(int i) {

   return (E) items[i];

  public boolean remove(Object o) {

   if (o == null) return false;

   final Object[] items = this.items;

   final ReentrantLock lock = this.lock;

   lock.lock();

   try {

   if (count 0) {

   final int putIndex = this.putIndex;

   int i = takeIndex;

   do {

   if (o.equals(items[i])) {

   removeAt(i);

   return true;

   if (++i == items.length)

   i = 0;

   } while (i != putIndex);

   return false;

   } finally {

   lock.unlock();

  void removeAt(final int removeIndex) {

   final Object[] items = this.items;

   if (removeIndex == takeIndex) {

   // removing front item; just advance

   items[takeIndex] = null;

   if (++takeIndex == items.length)

   takeIndex = 0;

   count--;

   if (itrs != null)

   itrs.elementDequeued();

   } else {

   final int putIndex = this.putIndex;

   for (int i = removeIndex;;) {

   int next = i + 1;

   if (next == items.length)

   next = 0;

   if (next != putIndex) {

   items[i] = items[next];

   i = next;

   } else {

   items[i] = null;

   this.putIndex = i;

   break;

   count--;

   if (itrs != null)

   itrs.removedAt(removeIndex);

   notFull.signal();

  }

 

  

  ArrayBlockingQueue总结

   【1】有界阻塞队列,先进先出,存取相互排斥

   【2】数据结构:静态数组(容量固定须指定长度,没有扩容机制,没有元素的位置也占用空间,被null占位)

   【3】ReentrantLock锁保证互斥性:存取都是同一把锁,操作的是同一个数组对象,存取相互排斥

   【4】阻塞对象(notEmpty【出队:队列count=0,无元素可取时,阻塞在该对象上】,notFull【入队:队列count=length,放不进元素时,阻塞在该对象上】)

   【5】入队,从队首开始添加元素,记录putIndex(到队尾时设置为0),唤醒notEmpty

   【6】出队,从队首开始添加元素,记录takeIndex(到队尾时设置为0),唤醒notFull

   【7】两个指针都是从队首向队尾移动,保证队列的先进先出原则(亮点:利用指针和数组,形成环状结构,重复利用内存空间)

  

  以上就是ArrayBlockingQueue详解(arrayblockingqueue和linkedblockingqueue)的详细内容,想要了解更多 ArrayBlockingQueue详解的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: