本文主要详细介绍Java BlockingQueue的实现原理。阻塞队列是Java util.concurrent包下的重要数据结构。有兴趣的可以看看。
BlockingQueue是Java util.concurrent包中一个重要的数据结构。BlockingQueue提供了一种线程安全的访问队列的方式:当队列被阻塞插入数据时,如果队列已满,线程将阻塞并等待,直到队列未满;从阻塞队列中获取数据时,如果队列为空,线程将阻塞并等待,直到队列不为空。而且很多高级同步类的实现都是基于BlockingQueue的。
BlockingQueue 的操作方法
BlockingQueue有四种不同的方法来插入、删除和检查队列中的元素。如果不能立即执行请求的操作,每个方法的行为会有所不同。这些方法如下:
四组不同的行为方式解释:
抛出异常:如果尝试的操作不能立即执行,抛出异常。
特定值:如果尝试的操作不能立即执行,则返回一个特定值(通常为真/假)。
阻塞:如果尝试的操作不能立即执行,方法调用将阻塞,直到可以执行为止。
超时:如果尝试的操作不能立即执行,方法调用将阻塞,直到可以执行,但等待时间不会超过给定值。返回一个特定值,以表明操作是否成功(通常为真/假)。
无法将null插入BlockingQueue。如果尝试插入null,BlockingQueue将抛出NullPointerException。
您可以访问BlockingQueue中的所有元素,而不仅仅是开始和结束元素。例如,您将一个对象放入队列进行处理,但是您的应用程序想要取消它。然后,您可以调用remove(o)方法从队列中移除特定的对象。但是,这样效率不高,除非真的不得已,否则尽量不要用这种方法。
BlockingQueue 的实现类
BlockingQueue是一个接口,你需要使用它的一个实现来使用BlockingQueue。Java.util.concurrent包下如下BlockingQueue接口的实现类:
ArrayBlockingQueue:ArrayBlockingQueue是一个有界阻塞队列,其内部实现是将对象放入一个数组中。有界性意味着它不能存储无限数量的元素。它对可以同时存储的元素数量有一个上限。初始化的时候可以设置这个上限,但是事后不能修改(译者注:因为是基于数组实现的,所以具有数组的特性:一旦初始化,大小就不能修改了)。
DelayQueue:DelayQueue保留元素,直到特定延迟到期。注入的元素必须实现java.util.concurrent.Delayed接口。
010-5900LinkedBlockingQueue将其元素存储在链结构(链接节点)中。如果需要,这种链结构可以选择一个上限。如果没有定义上限,则为整数。MAX_VALUE将被用作上限。
LinkedBlockingQueue:PriorityBlockingQueue是一个无限并发队列。它使用与java.util.PriorityQueue类相同的排序规则。不能在此队列中插入空值。插入PriorityBlockingQueue的所有元素都必须实现java.lang.Comparable接口。因此,该队列中元素的顺序取决于您自己的Comparable的实现。
PriorityBlockingQueue:SynchronousQueue是一个特殊的队列,其内部同时只能容纳一个元素。如果队列中已经有一个元素,试图向队列中插入新元素的线程将阻塞,直到另一个线程将该元素从队列中取出。类似地,如果队列为空,试图从队列中提取元素的线程将阻塞,直到另一个线程向队列中插入新元素。所以,把这个阶层称为队列,显然有些夸张。更像是一个会合点。
使用例子:
阻塞队列的最长使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便,代码如下:
包神话故事
导入Java。util。随机;
导入Java。util。并发。阻塞队列;
导入Java。util。并发。linkedblockingqueue
导入Java。util。并发。时间单位;
公共类BlockingQueueTest {
//生产者
公共静态类生成器实现可运行{
私有最终阻塞队列整数阻塞队列;
私有可变布尔标志;
私有随机的
公共生成器(BlockingQueueInteger阻塞队列){
这个。阻塞队列=阻塞队列;
标志=假
Random=new Random();
}
公共无效运行(){
而(!标志){
int info=随机。nextint(100);
尝试{
阻塞队列。put(info);
系统。出去。println(线程。当前线程().getName()' produce ' info);
线程。睡眠(50);
} catch (InterruptedException e) {
//TODO自动生成的捕捉块
e。printstacktrace();
}
}
}
公共无效关闭(){
标志=真
}
}
//消费者
公共静态类使用者实现可运行{
私有最终阻塞队列整数阻塞队列;
私有可变布尔标志;
公共使用者(BlockingQueueInteger阻塞队列){
这个。阻塞队列=阻塞队列;
}
公共无效运行(){
而(!标志){
(同Internationalorganizations)国际组织信息;
尝试{
info=阻塞队列。take();
系统。出去。println(线程。当前线程().getName()"消费者"信息);
线程。睡眠(50);
} catch (InterruptedException e) {
//TODO自动生成的捕捉块
e。printstacktrace();
}
}
}
公共无效关闭(){
标志=真
}
}
公共静态void main(String[] args){
BlockingQueueInteger阻塞队列=new LinkedBlockingQueueInteger(10);
生产者生产者=新生产者(阻塞队列);
消费者消费者=新消费者(阻塞队列);
//创建5个生产者,5个消费者
for(int I=0;i10i ){
如果(i5){
新线程(生产者,"生产者“我.start();
}否则{
新线程(消费者,"消费者“(i-5)).start();
}
}
尝试{
线程。睡眠(1000);
} catch (InterruptedException e) {
//TODO自动生成的捕捉块
e。printstacktrace();
}
生产。关闭();
消费者。关机();
}
}
SynchronousQueue:
其实阻塞队列实现阻塞同步的方式很简单,使用的就是是锁锁的多条件(条件)阻塞控制。使用阻塞队列封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的等待/发信号操作了。
下面是Jdk 1.7中ArrayBlockingQueue部分代码:
public ArrayBlockingQueue(int capacity,boolean fair) {
如果(容量=0)
抛出新的IllegalArgumentException();
//创建数组
this.items=新对象[容量];
//创建锁和阻塞条件
lock=新的可重入锁(fair);
notEmpty=lock。新条件();
未满=锁定。新条件();
}
//添加元素的方法
公共无效卖出抛出中断异常{
checkNotNull(e);
最终重入锁lock=this。锁;
锁定。lock interruptible();
尝试{
而(计数==项目数。长度)
没有满。await();
//如果队列不满就入队
入队(e)和:
}最后{
锁定。unlock();
}
}
//入队的方法
私有空的排队(英x) {
最终对象[]项=this。物品;
items[put index]=x;
if ( putIndex==items.length)
put index=0;
数数;
notempty。signal();
}
//移除元素的方法
公共E take()引发中断的异常{
最终重入锁lock=this。锁;
锁定。lock interruptible();
尝试{
while (count==0)
notempty。await();
返回dequeue();
}最后{
锁定。unlock();
}
}
//出队的方法
私有E出列(){
最终对象[]项=this。物品;
@SuppressWarnings('未选中)
E x=(E)项[取指数];
items[take index]=null;
if ( takeIndex==items.length)
取指数=0;
count-;
如果(itrs!=空)
itrs。元素出列();
没有满。signal();
返回x;
阻塞队列原理:
同时发生的包下还提供双端阻塞队列(BlockingDeque),和阻塞队列是类似的,只不过阻塞队列提供从任意一端插入或者抽取元素的队列。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。