不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理(java阻塞队列和非阻塞队列)

  本篇文章为你整理了不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理(java阻塞队列和非阻塞队列)的详细内容,包含有java阻塞队列线程安全吗 java阻塞队列和非阻塞队列 java 阻塞队列 非阻塞队列 java阻塞队列和非阻塞队列的区别 不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理,希望能帮助你了解 不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理。

   我们平时开发中好像很少使用到BlockingQueue(阻塞队列),比如我们想要存储一组数据的时候会使用ArrayList,想要存储**键值对**数据会使用HashMap,在什么场景下需要用到BlockingQueue呢?

  1. BlockingQueue的应用场景

  当我们处理完一批数据之后,需要把这批数据发给下游方法接着处理,但是下游方法的处理速率不受控制,可能时快时慢。如果下游方法的处理速率较慢,会拖慢当前方法的处理速率,这时候该怎么办呢?

  
我们平时开发中好像很少使用到BlockingQueue(阻塞队列),比如我们想要存储一组数据的时候会使用ArrayList,想要存储键值对数据会使用HashMap,在什么场景下需要用到BlockingQueue呢?

  1. BlockingQueue的应用场景

  当我们处理完一批数据之后,需要把这批数据发给下游方法接着处理,但是下游方法的处理速率不受控制,可能时快时慢。如果下游方法的处理速率较慢,会拖慢当前方法的处理速率,这时候该怎么办呢?

  你可能想到使用线程池,是个办法,不过需要创建很多线程,还要考虑下游方法支不支持并发,如果是CPU密集任务,可能多线程比单线程处理速度更慢,因为需要频繁上下文切换。

  这时候就可以考虑使用BlockingQueue,BlockingQueue最典型的应用场景就是上面这种生产者-消费者模型。生产者往队列中放数据,消费者从队列中取数据,中间使用BlockingQueue做缓冲队列,也就解决了生产者和消费者速率不同步的问题。

  你可能联想到了消息队列(MessageQueue),消息队列相当于分布式阻塞队列,而BlockingQueue相当于本地阻塞队列,只作用于本机器。对应的是分布式缓存(比如:Redis、Memcache)和本地缓存(比如:Guava、Caffeine)。

  另外很多框架中都有BlockingQueue的影子,比如线程池中就用到BlockingQueue做任务的缓冲。消息队列中发消息、拉取消息的方法也都借鉴了BlockingQueue,使用起来很相似。

  今天就一块深入剖析一下Queue的底层源码。

  2. BlockingQueue的用法

  BlockingQueue的用法非常简单,就是放数据和取数据。

  

/**

 

   * @apiNote BlockingQueue示例

   * @author 一灯架构

  public class Demo {

   public static void main(String[] args) throws InterruptedException {

   // 1. 创建队列,设置容量是10

   BlockingQueue Integer queue = new ArrayBlockingQueue (10);

   // 2. 往队列中放数据

   queue.put(1);

   // 3. 从队列中取数据

   Integer result = queue.take();

  

 

  为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。

  
当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer方法阻塞指定时间然后返回false。

  当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll方法阻塞指定时间然后返回null。

  当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

  工作中使用最多的就是offer、poll阻塞指定时间的方法。

  3. BlockingQueue实现类

  BlockingQueue常见的有下面5个实现类,主要是应用场景不同。

  
ArrayBlockingQueue

  基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。

  
LinkedBlockingQueue

  基于链表实现的阻塞队列,默认是无界队列,创建可以指定容量大小

  
4. BlockingQueue源码解析

  BlockingQueue的5种子类实现方式大同小异,这次就以最常用的ArrayBlockingQueue做源码解析。

  4.1 ArrayBlockingQueue类属性

  先看一下ArrayBlockingQueue类里面有哪些属性:

  

// 用来存放数据的数组

 

  final Object[] items;

  // 下次取数据的数组下标位置

  int takeIndex;

  // 下次放数据的数组下标位置

  int putIndex;

  // 当前已有元素的个数

  int count;

  // 独占锁,用来保证存取数据安全

  final ReentrantLock lock;

  // 取数据的条件

  private final Condition notEmpty;

  // 放数据的条件

  private final Condition notFull;

  

 

  ArrayBlockingQueue中4组存取数据的方法实现也是大同小异,本次以put和take方法进行解析。

  4.2 put方法源码解析

  无论是放数据还是取数据都是从队头开始,逐渐往队尾移动。

  

// 放数据,如果队列已满,就一直阻塞,直到有其他线程从队列中取走数据

 

  public void put(E e) throws InterruptedException {

   // 校验元素不能为空

   checkNotNull(e);

   final ReentrantLock lock = this.lock;

   // 加锁,加可中断的锁

   lock.lockInterruptibly();

   try {

   // 如果队列已满,就一直阻塞,直到被唤醒

   while (count == items.length)

   notFull.await();

   // 如果队列未满,就往队列添加元素

   enqueue(e);

   } finally {

   // 结束后,别忘了释放锁

   lock.unlock();

  // 实际往队列添加数据的方法

  private void enqueue(E x) {

   // 获取数组

   final Object[] items = this.items;

   // putIndex 表示本次插入的位置

   items[putIndex] = x;

   // ++putIndex 计算下次插入的位置

   // 如果本次插入的位置,正好是队尾,下次插入就从 0 开始

   if (++putIndex == items.length)

   putIndex = 0;

   // 元素数量加一

   count++;

   // 唤醒因为队列空等待的线程

   notEmpty.signal();

  

 

  源码中有个有意思的设计,添加元素的时候如果已经到了队尾,下次就从队头开始添加,相当于做成了一个循环队列。

  像下面这样:

  4.3 take方法源码

  

// 取数据,如果队列为空,就一直阻塞,直到有其他线程往队列中放数据

 

  public E take() throws InterruptedException {

   final ReentrantLock lock = this.lock;

   // 加锁,加可中断的锁

   lock.lockInterruptibly();

   try {

   // 如果队列为空,就一直阻塞,直到被唤醒

   while (count == 0)

   notEmpty.await();

   // 如果队列不为空,就从队列取数据

   return dequeue();

   } finally {

   // 结束后,别忘了释放锁

   lock.unlock();

  // 实际从队列取数据的方法

  private E dequeue() {

   // 获取数组

   final Object[] items = this.items;

   // takeIndex 表示本次取数据的位置,是上一次取数据时计算好的

   E x = (E) items[takeIndex];

   // 取完之后,就把队列该位置的元素删除

   items[takeIndex] = null;

   // ++takeIndex 计算下次取数据的位置

   // 如果本次取数据的位置,正好是队尾,下次就从 0 开始取数据

   if (++takeIndex == items.length)

   takeIndex = 0;

   // 元素数量减一

   count--;

   if (itrs != null)

   itrs.elementDequeued();

   // 唤醒被队列满所阻塞的线程

   notFull.signal();

   return x;

  

 

  4.4 总结

  ArrayBlockingQueue基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。

  ArrayBlockingQueue底层采用循环队列的形式,保证数组位置可以重复使用。

  ArrayBlockingQueue存取都采用ReentrantLock加锁,保证线程安全,在多线程环境下也可以放心使用。

  使用ArrayBlockingQueue的时候,预估好队列长度,保证生产者和消费者速率相匹配。

  
我是「一灯架构」,如果本文对你有帮助,欢迎各位小伙伴点赞、评论和关注,感谢各位老铁,我们下期见

  以上就是不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理(java阻塞队列和非阻塞队列)的详细内容,想要了解更多 不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: