,,关于Java8 parallelStream并发安全的深入讲解

,,关于Java8 parallelStream并发安全的深入讲解

本文主要介绍Java8 parallelStream并发安全的相关信息。通过示例代码非常详细,对大家的学习或者工作都有一定的参考价值。有需要的朋友就跟着下面的边肖学习吧。

背景

Java8的流接口大大降低了编写循环的复杂性。stream提供了map/reduce/collect等一系列聚合接口,还支持并发操作:parallelStream。

在爬虫开发过程中,经常会遇到遍历大型集合做重复操作的情况。这时候如果用串行执行的话会相当耗时,所以一般采用多线程来加速。Java8的ParalleStream提供了fork/join框架的并发执行能力。但如果使用不当,很容易陷入误区。

Java8的paralleStream是线程安全的吗

一个简单的例子,在下面的代码中,使用stream的forEach接口遍历1-10000,并分别插入到三个ArrayList中。第一个列表通过串行遍历插入,第二个使用并行流,第三个使用并行流并使用ReentryLock来同步插入列表的操作:

private static list integer list 1=new ArrayList();

private static list integer list 2=new ArrayList();

private static list integer list 3=new ArrayList();

私有静态锁Lock=new reentrant Lock();

公共静态void main(String[] args) {

IntStream.range(0,10000)。forEach(list 1:add);

IntStream.range(0,10000)。平行()。forEach(list 2:add);

IntStream.range(0,10000)。forEach(i - {

lock . lock();

尝试{

list 3 . add(I);

}最后{

lock . unlock();

}

});

System.out.println('串行执行的大小:' list 1 . size());

System.out.println('并行执行的大小:' list 2 . size());

System.out.println('锁的并行执行大小:' list 3 . Size());

}

执行结果:

串行执行的大小:10000

并行执行的规模:9595

锁并行执行的大小:10000

并且每个结果中并行执行的大小是不同的,而串行和锁定的结果总是正确的。显然,stream.parallel.forEach()中执行的操作不是线程安全的。

那么,既然paralleStream不是线程安全的,那么它里面的所有非原子操作都必须被锁定吗?我在stackOverflow上找到了答案:

https://code review . stack exchange . com/questions/60401/using-Java-8-parallel-streams

https://stack overflow . com/questions/22350288/parallel-streams-collectors-and-thread-safety

在对以上两个问题的回答中,证明了paralleStream的forEach接口不能保证同步,也提出了解决方案:使用collect和reduce接口。

http://docs . Oracle . com/javase/tutorial/collections/streams/parallelism . html

Javadoc中还引入了流的并发操作:

集合框架提供了同步包装器,它为任意集合添加了自动同步,使之成为线程安全的。

集合框架提供同步打包,这使得操作线程是安全的。

那么接下来,我们来看看如何使用collect接口。

stream的collect接口

不多说闲话直接上源代码。Stream.java中collect方法的句柄:

R,A R collect(收藏家?超级T,A,R收集者);

在这个实现方法中,参数是一个Collector对象,Collectors类的静态方法可以用来构造Collector对象,如Collectors.toList()、toSet()、toMap()等。API很容易找到,就不赘述了。

此外,如果我们想在collect接口中做更多的事情,我们需要自己实现Collector接口,我们需要实现以下方法:

SupplierA supplier();

BiConsumerA,T累加器();

binaryoperora combiner();

FunctionA,R finisher();

SetCharacteristics特征();

为了容易理解这三个参数,我们必须首先知道fork/join是如何工作的,一个图表涵盖了它:

上图来自:http://www.infoq.com/cn/articles/fork-join-introduction

简单地说就是大任务拆分成小任务,分别用不同线程去完成,然后把结果合并后返回。所以第一步是拆分,第二步是分开运算,第三步是合并。这三个步骤分别对应的就是收藏者的供应商,累加器和合并器。空谈是廉价的给我看代码,下面用一个例子来说明:

输入是一个10个整型数字的数组列表,通过计算转换成两倍类型的设置,首先定义一个计算组件:

计算。java:

公共类计算{

公共双重计算(整数){

return(double)(2 * num);

}

}

接下来在Main.java中定义输入的类型为数组列表的nums和类型为一组的输出结果结果:

private list整数nums=new ArrayList();

private SetDouble result=new HashSet();

定义转换目录的奔跑方法,实现收藏者接口,调用内部类容器中的方法,其中特征()方法返回空设置即可:

公共无效运行(){

//填充原始数据,nums中填充0-9 10个数

IntStream.range(0,10).forEach(nums:add);

//实现收藏者接口

result=nums.stream().平行()。collect(new CollectorInteger,Container,SetDouble() {

@覆盖

公共供应商容器供应商(){

返回容器*新

}

@覆盖

公共双消费者容器,整数累加器(){

返回容器*积累

}

@覆盖

公共BinaryOperatorContainer合并器(){

返回容器:合并;

}

@覆盖

public FunctionContainer,SetDouble finisher() {

返回容器*获取结果

}

@覆盖

公共集字符字符(){

//固定写法

返回收藏。空集();

}

});

}

构造内部类容器,该类的作用是一个存放输入的容器,定义了三个方法:

积聚方法对输入数据进行处理并存入本地的结果

结合方法将其他容器的结果合并到本地的结果中

获取结果方法返回本地的结果

容器。java:

类容器{

//定义本地的结果

公共集

公共容器(){

这个。set=new HashSet();

}

公共容器累积(整数){

这个。设置。添加(计算。计算(数字));

还这个;

}

公共集装箱联合收割机(集装箱集装箱){

这个。设置。addall(容器。设置);

还这个;

}

public SetDouble getResult() {

返回这一套

}

}

在Main.java中编写测试方法:

公共静态void main(String[] args) {

Main Main=new Main();

主要的。run();

System.out.println('原始数据:');

主要的。nums。foreach(I-系统。出去。打印(I ' '));

系统。出去。println(' \ n \ n集合方法加工后的数据:');

主要的。结果。foreach(I-系统。出去。打印(I ' '));

}

输出:

原始数据:

0 1 2 3 4 5 6 7 8 9

收集方法加工后的数据:

0.0 2.0 4.0 8.0 16.0 18.0 10.0 6.0 12.0 14.0

我们将10个整型数值的目录转成了10个两倍类型的设置,至此验证成功~

本程序参考http://blog.csdn.net/io_field/article/details/54971555。

一言蔽之

总结就是平行流里直接去修改变量是非线程安全的,但是采用收集和减少操作就是满足线程安全的了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: