python多线程共享数据,python 多进程共享变量
引用:https://zhuanlan.zhihu.com/p/32513483
共享numpy数组需要numpy时,往往是数据量很大的场景,如果直接复制,会浪费很多内存。共享的numpy数组通过上一节的数组实现,然后用numpy.frombuffer和shape把共享内存封装成一个numpy数组。代码如下:
#编码:utf8
导入类型
导入操作系统
导入多重处理
将numpy作为np导入
NUM _ PROCESS=multi processing . CPU _ count()
定义工人(索引):
main _ NP array=NP . from buffer(shared _ array _ base,dtype=ctypes.c_double)
main _ NP array=main _ NP array . shape(NUM _ PROCESS,10)
pid=os.getpid()
main_nparray[index,=pid
返回pid
if __name__==__main__ :
shared_array_base=多重处理。数组(
ctypes.c_double,NUM_PROCESS * 10,lock=False)
池=多处理。池(进程数=进程数)
result=pool.map(worker,range(NUM_PROCESS))
main _ NP array=NP . from buffer(shared _ array _ base,dtype=ctypes.c_double)
main _ NP array=main _ NP array . shape(NUM _ PROCESS,10)
print( main_nparray)
运行结果:
===============================================================
当多个进程共享大数据时,比如numpy数组,我们需要利用多处理下的值和数组来实现多个进程的共享。然而,另一个重要的问题是数据的读写方式。由于CPython是在语言的数据结构上重新打包的,所以数据的读写需要翻译。也就是说,读写数据需要在Python数据类型下读写相应的C类数据结构,也正是因为这种数据读写方式,使得Python数据的操作比C类数据慢很多。
numpy数据的底层也是C型数据结构。同时,numpy下的很多数据操作可以直接在numpy下的底层数据结构上操作,节省了数据结构转换的时间和费用。只要不把numpy数据转换成Python数据,底层数据就可以直接在numpy下操作。
虽然mulprocessing模块提供了共享数据类型,但是当不同的进程读写共享数据时,数据类型会发生转换。
更直接的说,mutprocessing虽然提供了共享数据类型Value和Array,但是不同的进程不能直接操作它们。如果子进程要读写共享数据值和数组,需要将共享数据转换成可以操作的Python数据类型或者numpy数据类型,否则很难直接操作共享数据。这时numpy.frombuffer函数就派上用场了。numpy.frombuffer函数可以直接读取Python数据类型、numpy数据类型和共享数据类型的底层数据类型,即C数据类型。在这种情况下,使用numpy.frombuffer函数会省去数据类型转换的环节。Numpy.frombuffer可以直接读取共享数据类型Value和Array,因为Value和Array的底层实现是C数据类型。
下面是多进程共享数据读写方法的一些代码,以验证多进程大数据的最佳共享方法。
操作环境介绍:
软件:Ubuntu18.04系统,python3.7.5
硬件:英特尔i7-8700 cpu、6个物理内核和12个逻辑内核
1.使用multi processing . value/multi processing . array numpy . from buffer:
(使用numpy.frombuffer读写数据可以直接读写数据,无需类型转换)
导入类型
导入时间
导入多重处理
将numpy作为np导入
NUM _ PROCESS=multi processing . CPU _ count()
大小=1000000
定义工人(索引):
main _ NP array=NP . from buffer(shared _ array _ base[index],dtype=ctypes.c_double)
对于范围内的I(10000):
main_nparray[:]=索引I
回报指数
if __name__==__main__ :
shared_array_base=[]
for _ in范围(NUM_PROCESS):
shared_array_base.append(多重处理。数组( d ,size,lock=False))
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b-a)
#打印(结果)
对于范围内的I(进程号):
main _ NP数组=NP。from buffer(shared _ array _ base[I],dtype=ctypes.c_double)
print(main_nparray)
print(type(main_nparray))
print(main_nparray.shape)
# 73.216189146
# 73.2605750561
# 73.3307318687
# 73.4090409279
# 73.4219110012查看代码
运行时间:
73.216189146
73.2605750561
73.3307318687
73.4090409279
73.4219110012
运行过程中各进程中央处理器使用率情况:
可以看到在运行过程中12个子进程的使用率为100%,而主进程的使用率为0.3%,可以看到使用多重处理。价值/多重处理。数组numpy.frombuffer的方式各子进程在读写共享空间内容基本是不需要太多等待的,可以保证子进程基本以全速进行运行,而且主进程基本不参与计算(cpu利用率:0.3%)。也就是说采用该种方式各子进程操作父进程中的共享数据可以和操作自身进程空间内的数据达到基本一致的速度,为证明给出下面代码:
导入多重处理
将数组作为铭牌导入
NUM _ PROCESS=多重处理。CPU _ count()
大小=1000000
定义工人(索引):
main_nparray=np.zeros(size)
对于范围内的我(10000):
main_nparray[:]=索引我
回报指数
if __name__==__main__ :
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b-a)视图代码
运行时间:
73.0335
73.2103
73.1925
73.1642
73.2643
也就是说使用该种方式,各子进程对父进程中的共享内存的操作其性能与操作自己进程空间下内存基本一致,没有什么性能损耗。
===================================================================
2.使用多重处理。食槽方式:
(共享数据,隐式的、自动的进行数据类型的转换)
从多重处理导入流程,经理
导入多重处理
将数组作为铭牌导入
导入时间
NUM _ PROCESS=多重处理。CPU _ count()
大小=1000000
定义工人(索引):
shared _ array _ base[index][0]=NP。零(形状=大小)
shared_array_base[index][0]=索引
对于范围内的我(10000):
shared_array_base[index][0]=1
#1246.2459
#1226.7996
#1238.3933
#1241.1819
# print(共享阵列库[索引][0])
回报指数
if __name__==__main__ :
shared_array_base=[]
manager=Manager() #字典方式
对于_在范围(数字流程):
共享数组基础。追加(经理。dict())
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b - a)
对于范围内的索引(数字流程):
print(shared _ array _ base[索引][0])查看代码
运行时间:
1246.2459
1226.7996
1238.3933
1241.1819
1241.2889
运行时个子进程中央处理器利用率:
可以看到使用食槽的方式虽然可以实现共享内存的操作,但是该种方式需要父进程进行参与,随着子进程数量的增多父进程的负担也就越重,同时各子进程需要等待的时间也就越多。从运行时间上可以看到使用食槽的方式对共享内存进行操作整体性能下降了十多倍,其性能远低于值/数组numpy.frombuffer方式。
使用食槽方式对共享内存的操作是需要父进程参与的,这一点和值/数组方式不同,同时使用食槽方式各子进程需要对共享数据进行类型转换而这又进一步的影响性能表现。
如果上面代码修改为:
#编码:UTF-8
从多重处理导入流程,经理
导入多重处理
将数组作为铭牌导入
导入时间
NUM _ PROCESS=多重处理。CPU _ count()
# NUM _ PROCESS=1 # 125.9360 125.2017 118.3942 133.9661 99.3769 118.5580
大小=1000000
定义工人(索引):
shared _ array _ base[index][0]=NP。零(形状=大小)
shared_array_base[index][0]=索引
对于范围内的我(10000):
shared _ array _ base[index][0][:]=索引I #错误结果
# # # shared _ array _ base[index][0]=1
#1246.2459
#1226.7996
#1238.3933
#1241.1819
#1241.2889
# print(共享阵列库[索引][0])
回报指数
if __name__==__main__ :
shared_array_base=[]
manager=Manager() #字典方式
对于_在范围(数字流程):
共享数组基础。追加(经理。dict())
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b - a)
对于范围内的索引(数字流程):
print(shared _ array _ base[索引][0])查看代码
运行结果:
672.297180891037
[0.0.0.0.0.0.]
[1.1.1.1.1.1.]
[2.2.2.2.2.2.]
[3.3.3.3.3.3.]
[4.4.4.4.4.4.]
[5.5.5.5.5.5.]
[6.6.6.6.6.6.]
[7.7.7.7.7.7.]
[8.8.8.8.8.8.]
[9.9.9.9.9.9.]
[10.10.10.10.10.10.]
[11.11.11.11.11.11.]
运行时中央处理器使用率:
可以看到:
语句:
shared_array_base[index][0]=索引可以实现对共享内存的写操作。
语句:
shared _ array _ base[index][0][:]=索引I #错误结果并不能实现对共享内存的写操作。
而即使不对共享内存进行写操作其运行时间也只是缩减了一半,与值/数组numpy.frombuffer方式相比使用经理的方式操作共享内存即使是只进行读操作也是一种很耗费父进程计算资源的事情。使用经理的方式各子进程读取共享内存中的数据也是需要对共享数据进行格式转换的,而这部分工作也是需要父进程参与的,因此使用经理的方式并不能达到较好的性能表现。
如果使用单子进程的话:
代码:
#编码:UTF-8
从多重处理导入流程,经理
导入多重处理
将数组作为铭牌导入
导入时间
# NUM _ PROCESS=多重处理。CPU _ count()
NUM _ PROCESS=1 # 125.9360 125.2017 118.3942 133.9661 99.3769 118.5580
大小=1000000
定义工人(索引):
shared _ array _ base[index][0]=NP。零(形状=大小)
shared_array_base[index][0]=索引
对于范围内的我(10000):
shared_array_base[index][0]=1
#1246.2459
#1226.7996
#1238.3933
#1241.1819
#1241.2889
# print(共享阵列库[索引][0])
回报指数
if __name__==__main__ :
shared_array_base=[]
manager=Manager() #字典方式
对于_在范围(数字流程):
共享数组基础。追加(经理。dict())
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b - a)
对于范围内的索引(数字流程):
print(shared _ array _ base[索引][0])查看代码
运行时间:
125.9360
125.2017
118.3942
133.9661
99.3769
118.5580
123.2825
运行时中央处理器使用率:
这充分说明子进程对父进程的共享内存进行读写操作是需要父进程参与的,而这部分需要父进程参与的工作就是共享内存数据的类型转换工作。随着子进程数量的增加会加重父进程的负担,从而导致各子进程均难以获得较好的性能表现。
=========================================================================
3.只使用多重处理。价值/多重处理。排列方式:
(共享数据,使用中间数据进行操作后再直接赋值给共享数据:因为没有进行数据类型转换的共享数据难以直接进行读写操作)
代码:
将数组作为铭牌导入
导入多重处理
导入时间
导入类型
NUM _ PROCESS=多重处理。CPU _ count()
大小=1000000
定义工人(索引):
temp=shared _ array _ base[索引]
temp2=np.zeros(大小)索引
对于范围内的我(100 * 100):
温度2=1
温度[:]=温度2[:]
###2129.3716
###2253.2248
###2127.4056
###2128.4252
回报指数
if __name__==__main__ :
shared_array_base=[]
对于_在范围(数字流程):
shared_array_base.append(多重处理Array(ctypes.c_double,size,lock=False))
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b-a)
对于范围内的I(进程号):
main _ NP数组=NP。from buffer(shared _ array _ base[I],dtype=ctypes.c_double)
print(main_nparray)
print(type(main_nparray))
打印(main_nparray.shape)视图代码
运行时间:
2129.3716
2253.2248
2127.4056
2128.4252
运行时中央处理器使用率:
为估计该种形式下读写共享数据的耗时,给出下面代码:
将数组作为铭牌导入
导入多重处理
导入时间
导入类型
NUM _ PROCESS=多重处理。CPU _ count()
大小=1000000
定义工人(索引):
temp=shared _ array _ base[索引]
temp2=np.zeros(大小)索引
对于范围内的我(100 * 100):
温度2=1
# # #临时[:]=临时2[:]
###2129.3716
###2253.2248
###2127.4056
###2128.4252
回报指数
if __name__==__main__ :
shared_array_base=[]
对于_在范围(数字流程):
shared_array_base.append(多重处理Array(ctypes.c_double,size,lock=False))
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b-a)
对于范围内的I(进程号):
main _ NP数组=NP。from buffer(shared _ array _ base[I],dtype=ctypes.c_double)
print(main_nparray)
print(type(main_nparray))
打印(main_nparray.shape)视图代码
运行时间:
75.1454
75.0409
75.0613
74.9661
75.3398
74.6730
从上面的代码的运行时间上我们可以看出如果我们只使用多重处理。价值/多重处理。排列而不使用numpy.frombuffer函数的话也难以取得很好的性能表现,其中对共享数据的读写耗时大致需要2000秒以上,也就是说对共享数据的读操作如果不使用numpy.frombuffer的方式直接对数据底层进行操作,而是进行手动的类型转换,其耗时是巨大的,而如果使用nump。从缓冲区的方式对共享数据进行读写其耗时是几乎可以不计的。同时我们也可以看到使用多重处理。价值/多重处理。排列方式子进程对父进程的共享数据进行读写操作是不需要父进程参与的,其中读写共享数据时进行的数据类型转换的工作都是在子进程内进行的。
同时可以看到如果不使用numpy.frombuffer方式直接对共享数据底层操作而是进行数据转换的话,经理的方式要比手动转换性能高。
=================================================
最终结论就是在计算机编程语言中如果多进程对共享内存操作的话,最佳性能的实现是使用多重处理。价值/多重处理。数组numpy.frombuffer方式。
以上功能使用结构结构编写:
导入时间
导入类型
从指针导入*
导入多重处理
将数组作为铭牌导入
NUM _ PROCESS=多重处理。CPU _ count()
大小=1000000
类别测试(结构):
及格
测试range(NUM _ PROCESS)中I的. field _=[(str(I),c _ double *(size)]。
数据=多重处理Value(Test,lock=False) #全零初始化
定义工人(索引):
main _ NP数组=NP。from buffer(data, d ,size,index * size * sizeof(ctypes。c _ double))
对于范围内的我(10000):
#print(index,main_nparray)
main_nparray[:]=索引我
回报指数
池=多处理。池(进程数=进程数)
a=time.time()
result=pool.map(worker,range(NUM_PROCESS))
b=time.time()
打印(b - a)
nparray=np.frombuffer(data, d ,NUM_PROCESS*size,0)
nparray.resize(NUM_PROCESS,size)
打印(结果:)
#print(nparray.shape)
打印(nparray)
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。