本文主要介绍Java8 parallelStream并发安全的相关信息。通过示例代码非常详细,对大家的学习或者工作都有一定的参考价值。有需要的朋友就跟着下面的边肖学习吧。
背景
Java8的流接口大大降低了编写循环的复杂性。stream提供了map/reduce/collect等一系列聚合接口,还支持并发操作:parallelStream。
在爬虫开发过程中,经常会遇到遍历大型集合做重复操作的情况。这时候如果用串行执行的话会相当耗时,所以一般采用多线程来加速。Java8的ParalleStream提供了fork/join框架的并发执行能力。但如果使用不当,很容易陷入误区。
Java8的paralleStream是线程安全的吗
一个简单的例子,在下面的代码中,使用stream的forEach接口遍历1-10000,并分别插入到三个ArrayList中。第一个列表通过串行遍历插入,第二个使用并行流,第三个使用并行流并使用ReentryLock来同步插入列表的操作:
private static list integer list 1=new ArrayList();
private static list integer list 2=new ArrayList();
private static list integer list 3=new ArrayList();
私有静态锁Lock=new reentrant Lock();
公共静态void main(String[] args) {
IntStream.range(0,10000)。forEach(list 1:add);
IntStream.range(0,10000)。平行()。forEach(list 2:add);
IntStream.range(0,10000)。forEach(i - {
lock . lock();
尝试{
list 3 . add(I);
}最后{
lock . unlock();
}
});
System.out.println('串行执行的大小:' list 1 . size());
System.out.println('并行执行的大小:' list 2 . size());
System.out.println('锁的并行执行大小:' list 3 . Size());
}
执行结果:
串行执行的大小:10000
并行执行的规模:9595
锁并行执行的大小:10000
并且每个结果中并行执行的大小是不同的,而串行和锁定的结果总是正确的。显然,stream.parallel.forEach()中执行的操作不是线程安全的。
那么,既然paralleStream不是线程安全的,那么它里面的所有非原子操作都必须被锁定吗?我在stackOverflow上找到了答案:
https://code review . stack exchange . com/questions/60401/using-Java-8-parallel-streams
https://stack overflow . com/questions/22350288/parallel-streams-collectors-and-thread-safety
在对以上两个问题的回答中,证明了paralleStream的forEach接口不能保证同步,也提出了解决方案:使用collect和reduce接口。
http://docs . Oracle . com/javase/tutorial/collections/streams/parallelism . html
Javadoc中还引入了流的并发操作:
集合框架提供了同步包装器,它为任意集合添加了自动同步,使之成为线程安全的。
集合框架提供同步打包,这使得操作线程是安全的。
那么接下来,我们来看看如何使用collect接口。
stream的collect接口
不多说闲话直接上源代码。Stream.java中collect方法的句柄:
R,A R collect(收藏家?超级T,A,R收集者);
在这个实现方法中,参数是一个Collector对象,Collectors类的静态方法可以用来构造Collector对象,如Collectors.toList()、toSet()、toMap()等。API很容易找到,就不赘述了。
此外,如果我们想在collect接口中做更多的事情,我们需要自己实现Collector接口,我们需要实现以下方法:
SupplierA supplier();
BiConsumerA,T累加器();
binaryoperora combiner();
FunctionA,R finisher();
SetCharacteristics特征();
为了容易理解这三个参数,我们必须首先知道fork/join是如何工作的,一个图表涵盖了它:
上图来自:http://www.infoq.com/cn/articles/fork-join-introduction
简单地说就是大任务拆分成小任务,分别用不同线程去完成,然后把结果合并后返回。所以第一步是拆分,第二步是分开运算,第三步是合并。这三个步骤分别对应的就是收藏者的供应商,累加器和合并器。空谈是廉价的给我看代码,下面用一个例子来说明:
输入是一个10个整型数字的数组列表,通过计算转换成两倍类型的设置,首先定义一个计算组件:
计算。java:
公共类计算{
公共双重计算(整数){
return(double)(2 * num);
}
}
接下来在Main.java中定义输入的类型为数组列表的nums和类型为一组的输出结果结果:
private list整数nums=new ArrayList();
private SetDouble result=new HashSet();
定义转换目录的奔跑方法,实现收藏者接口,调用内部类容器中的方法,其中特征()方法返回空设置即可:
公共无效运行(){
//填充原始数据,nums中填充0-9 10个数
IntStream.range(0,10).forEach(nums:add);
//实现收藏者接口
result=nums.stream().平行()。collect(new CollectorInteger,Container,SetDouble() {
@覆盖
公共供应商容器供应商(){
返回容器*新
}
@覆盖
公共双消费者容器,整数累加器(){
返回容器*积累
}
@覆盖
公共BinaryOperatorContainer合并器(){
返回容器:合并;
}
@覆盖
public FunctionContainer,SetDouble finisher() {
返回容器*获取结果
}
@覆盖
公共集字符字符(){
//固定写法
返回收藏。空集();
}
});
}
构造内部类容器,该类的作用是一个存放输入的容器,定义了三个方法:
积聚方法对输入数据进行处理并存入本地的结果
结合方法将其他容器的结果合并到本地的结果中
获取结果方法返回本地的结果
容器。java:
类容器{
//定义本地的结果
公共集
公共容器(){
这个。set=new HashSet();
}
公共容器累积(整数){
这个。设置。添加(计算。计算(数字));
还这个;
}
公共集装箱联合收割机(集装箱集装箱){
这个。设置。addall(容器。设置);
还这个;
}
public SetDouble getResult() {
返回这一套
}
}
在Main.java中编写测试方法:
公共静态void main(String[] args) {
Main Main=new Main();
主要的。run();
System.out.println('原始数据:');
主要的。nums。foreach(I-系统。出去。打印(I ' '));
系统。出去。println(' \ n \ n集合方法加工后的数据:');
主要的。结果。foreach(I-系统。出去。打印(I ' '));
}
输出:
原始数据:
0 1 2 3 4 5 6 7 8 9
收集方法加工后的数据:
0.0 2.0 4.0 8.0 16.0 18.0 10.0 6.0 12.0 14.0
我们将10个整型数值的目录转成了10个两倍类型的设置,至此验证成功~
本程序参考http://blog.csdn.net/io_field/article/details/54971555。
一言蔽之
总结就是平行流里直接去修改变量是非线程安全的,但是采用收集和减少操作就是满足线程安全的了。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。