python读取mpp文件,

  python读取mpp文件,

  分享某计算机编程语言下的平均弹着点教程——并行编程的计算机编程语言介绍_wx62830f4b679a4的技术博客_博客

  接前文:

  分享某计算机编程语言下的平均弹着点教程——用MPI 1.0.2文档进行并行编程的计算机编程语言介绍之

  https://材料。jeremybejarano。com/MPIwithPython/collective com。超文本标记语言

  集体交流

  减少(…)和全部减少(…)例子:

  减少

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  rankF=numpy.array(float(rank))

  如果秩==0:

  total=numpy.zeros(1)

  否则:

  总计=无

  comm.Reduce(rankF,total,op=MPI .最大)

  #comm.Reduce(rankF,total,op=MPI .总和)

  如果秩==0:

  打印(总计: ,总计)全部减少

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  rankF=numpy.array(float(rank))

  total=numpy.zeros(1)

  comm.Allreduce(rankF,total,op=MPI .最大)

  打印(排名{}:总计{} 。格式(排名、总计))散点图

  # dotProductParallel_1.py

  # 奔跑语法示例:mpiexec-n 4 python 26 dotProductParallel _ 1。40000日元

  从mpi4py导入平均弹着点

  进口数量

  导入系统

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  #从命令行读取

  # n=int(sys。argv[1])#向量的长度

  n=10000

  #任意示例向量,生成的向量被

  #为方便起见的流程

  x=numpy.linspace(0,100,n)如果通信等级==0否则无

  如果comm.rank==0,则y=numpy.linspace(20,300,n)否则无

  #初始化为数组数组

  dot=numpy.array([0 .])

  local_n=numpy.array([0],dtype=numpy.int32)

  #一致性测试

  如果秩==0:

  如果n!=y.size:

  打印("向量长度不匹配")

  通信中止()

  #目前,我们的程序不能处理大小不是被

  #处理器的数量

  如果n %大小!=0:

  打印(处理器的数量必须除以n . )

  通信中止()

  #原始向量中每个进程部分的长度

  local_n=numpy.array([n/size],dtype=numpy.int32)

  #向所有进程传达本地数组大小

  comm.Bcast(local_n,root=0)

  #初始化为数组数组

  local_x=numpy.zeros(local_n)

  local_y=numpy.zeros(local_n)

  #划分向量

  comm.Scatter(x,local_x,root=0)

  comm.Scatter(y,local_y,root=0)

  #点积的本地计算

  local _ dot=numpy。数组([numpy。dot(local _ x,local_y)]

  #将每个结果相加

  #comm.Reduce(local_dot,dot,op=MPI .总和)

  comm.Allreduce(local_dot,dot,op=MPI .总和)

  打印(点积为,点[0],并行计算)

  如果秩==0:

  #打印(点积为,点[0],并行计算)

  print(and ,numpy.dot(x,y),连续计算)

  散射体(…)和gatherev(…)#为了获得正确的性能,请使用3个进程无缓冲运行:

  # mpiexec-n 3 python 26 scratch。皮尤

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  如果秩==0:

  x_global=numpy.linspace(0,100,11)

  否则:

  x_global=无

  如果秩==0:

  x_local=numpy.zeros(1)

   否则如果等级==1:

  x_local=numpy.zeros(1)

   否则如果等级==2:

  x_local=numpy.zeros(9)

  如果秩==0:

  打印("散布")

  comm.Scatterv([x_global,(1,1,9),(0,1,2),MPI .DOUBLE],x_local)

  print( process str(rank) has str(x _ local))

  通信障碍()

  如果秩==0:

  打印("收集")

  xGathered=numpy.zeros(11)

  否则:

  x聚集=无

  comm . gatherev(x _ local,[xGathered,(1,1,9),(0,1,2),MPI .双倍])

  print( process str(rank) has str(xGathered))该代码运行命令为:

  mpiexec -np 3 python x.py

  上个代码有个地方容易被忽视那就是函数通信散射v其实是非堵塞的,也就是说如果等级==0进程在执行该语句后不进行同步操作:通信障碍

  那么等级==0进程会继续向下执行而不会等待秩==1,秩==2进程完全接收数据到各自的变量x _本地中。

  给出修改的代码:

  #为了获得正确的性能,运行无缓冲的3个进程:

  # mpiexec-n 3 python 26 scratch。皮尤

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  如果秩==0:

  x_global=numpy.linspace(0,100,11)

  否则:

  x_global=无

  如果秩==0:

  x_local=numpy.zeros(1)

   否则如果等级==1:

  x_local=numpy.zeros(1)

   否则如果等级==2:

  x_local=numpy.zeros(9)

  如果秩==0:

  打印("散布")

  如果排位!=0:

  导入时间

  时间。睡眠(10)

  comm.Scatterv([x_global,(1,1,9),(0,1,2),MPI .DOUBLE],x_local)

  print( process str(rank) has str(x _ local))

  #通信障碍()

  如果秩==0:

  打印("收集")

  xGathered=numpy.zeros(11)

  否则:

  x聚集=无

  comm . gatherev(x _ local,[xGathered,(1,1,9),(0,1,2),MPI .双倍])

  print( process str(rank) has str(xGathered))该代码执行后会先打印结果:

  分散

  进程0有[0。]

  聚集然后进入堵塞大致10秒时间,由此可以看到不进行通信障碍操作的通信散射v是非堵塞的,秩==0没有等待其他进程完全接收数据便向下执行了,但是从运行结果上我们可以看到收集操作通信收集v是堵塞的,也正因此等级==0进程会在此处进入堵塞10秒的状态。

  由于上面的代码后续运行中有堵塞操作了,因此没有通信障碍操作也不会有问题,不过对于平均弹着点中的非堵塞操作还是进行同步操作通信障碍操作以防万一的安全一些。

  该代码运行命令同样也为:

  mpiexec -np 3 python x.py

  ==================================================

  上面的代码其实是把进程数量硬编码进入代码里面了,如果运行不是-np 3而是其他数值则会报错,而这种编码方式是不妥的,因此不把进程数硬编码进去同时也能实现很好的计算负载是需要的。给出自己的工作:

  实现计算负载均衡的代码:

  #为了获得正确的性能,运行无缓冲的3个进程:

  # mpiexec-n 3 python 26 scratch。皮尤

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  n=10000

  如果秩==0:

  x_global=numpy.linspace(0,100,n)

  否则:

  x_global=无

  n_local=numpy.zeros(size,dtype=numpy.int32)

  n_local[:]=n //size

  如果n %大小!=0:

  n_local[-(n%size):]=1

  begin_local=numpy.zeros(大小)

  对于范围内的我(1,大小):

  开始_本地[我]=开始_本地[i-1]n _本地[I-1]

  x _ local=numpy。零(n _ local[rank])

  #如果排位!=0:

  #导入时间

  #时间。睡眠(5)

  如果秩==0:

  打印("散布")

  comm.Scatterv([x_global,n_local,begin_local,MPI .DOUBLE],x_local)

  print( process str(rank) has str(x _ local[:5]))

  通信障碍()

  如果秩==0:

  打印("收集")

  xGathered=numpy.zeros(n)

  否则:

  x聚集=无

  comm . gatherev(x _ local,[xGathered,n_local,begin_local,MPI .双倍])

  print( process str(rank) has str(xGathered))

  运行命令:

  mpiexec -np 23 python x.py

  可以看到改进后的代码没有把总共运行的进程数硬编码到代码中,而是可以根据实际需要对任意数值下的总进程数实现计算负载均衡。改进代码根据总的运行进程数将计算服务均衡的划分给所有计算进程。

  ================================================

  以上代码运行命令如无特殊说明则为:

  mpiexec -np 8 python x.py

  减少(…)和全部减少(…)例子:

  减少

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  rankF=numpy.array(float(rank))

  如果秩==0:

  total=numpy.zeros(1)

  否则:

  总计=无

  comm.Reduce(rankF,total,op=MPI .最大)

  #comm.Reduce(rankF,total,op=MPI .总和)

  如果秩==0:

  打印(合计: ,合计)

  Allreduce

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  rankF=numpy.array(float(rank))

  total=numpy.zeros(1)

  comm.Allreduce(rankF,total,op=MPI .最大)

  打印(排名{}:总计{} 。格式(排名,总计))

  分散

  # dotProductParallel_1.py

  # 奔跑语法示例:mpiexec-n 4 python 26 dotProductParallel _ 1。40000日元

  从mpi4py导入平均弹着点

  进口数量

  导入系统

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  #从命令行读取

  # n=int(sys。argv[1])#向量的长度

  n=10000

  #任意示例向量,生成的向量被

  #为方便起见的流程

  x=numpy.linspace(0,100,n)如果通信等级==0否则无

  如果comm.rank==0,则y=numpy.linspace(20,300,n)否则无

  #初始化为数组数组

  dot=numpy.array([0 .])

  local_n=numpy.array([0],dtype=numpy.int32)

  #一致性测试

  如果秩==0:

  如果n!=y.size:

  打印("向量长度不匹配")

  通信中止()

  #目前,我们的程序不能处理大小不是被

  #处理器的数量

  如果n %大小!=0:

  打印(处理器的数量必须除以n . )

  通信中止()

  #原始向量中每个进程部分的长度

  local_n=numpy.array([n/size],dtype=numpy.int32)

  #向所有进程传达本地数组大小

  comm.Bcast(local_n,root=0)

  #初始化为数组数组

  local_x=numpy.zeros(local_n)

  local_y=numpy.zeros(local_n)

  #划分向量

  comm.Scatter(x,local_x,root=0)

  comm.Scatter(y,local_y,root=0)

  #点积的本地计算

  local _ dot=numpy。数组([numpy。dot(local _ x,local_y)]

  #将每个结果相加

  #comm.Reduce(local_dot,dot,op=MPI .总和)

  comm.Allreduce(local_dot,dot,op=MPI .总和)

  打印(点积为,点[0],并行计算)

  如果秩==0:

  #打印(点积为,点[0],并行计算)

  print(and ,numpy.dot(x,y),连续计算)

  散射体(…)和gatherev(…)#为了获得正确的性能,请使用3个进程无缓冲运行:

  # mpiexec-n 3 python 26 scratch。皮尤

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  如果秩==0:

  x_global=numpy.linspace(0,100,11)

  否则:

  x_global=无

  如果秩==0:

  x_local=numpy.zeros(1)

   否则如果等级==1:

  x_local=numpy.zeros(1)

   否则如果等级==2:

  x_local=numpy.zeros(9)

  如果秩==0:

  打印("散布")

  comm.Scatterv([x_global,(1,1,9),(0,1,2),MPI .DOUBLE],x_local)

  print( process str(rank) has str(x _ local))

  通信障碍()

  如果秩==0:

  打印("收集")

  xGathered=numpy.zeros(11)

  否则:

  x聚集=无

  comm . gatherev(x _ local,[xGathered,(1,1,9),(0,1,2),MPI .双倍])

  print( process str(rank) has str(xGathered))该代码运行命令为:

  mpiexec -np 3 python x.py

  上个代码有个地方容易被忽视那就是函数通信散射v其实是非堵塞的,也就是说如果等级==0进程在执行该语句后不进行同步操作:通信障碍

  那么等级==0进程会继续向下执行而不会等待秩==1,秩==2进程完全接收数据到各自的变量x _本地中。

  给出修改的代码:

  #为了获得正确的性能,运行无缓冲的3个进程:

  # mpiexec-n 3 python 26 scratch。皮尤

  进口数量

  从mpi4py导入平均弹着点

  comm=MPI .通信_世界

  rank=comm.Get_rank()

  如果秩==0:

  x_global=numpy.linspace(0,100,11)

  否则:

  x_global=无

  如果秩==0:

  x_local=numpy.zeros(1)

   否则如果等级==1:

  x_local=numpy.zeros(1)

   否则如果等级==2:

  x_local=numpy.zeros(9)

  如果秩==0:

  打印("散布")

  如果排位!=0:

  导入时间

  时间。睡眠(10)

  comm.Scatterv([x_global,(1,1,9),(0,1,2),MPI .DOUBLE],x_local)

  print( process str(rank) has str(x _ local))

  #通信障碍()

  如果秩==0:

  打印("收集")

  xGathered=numpy.zeros(11)

  否则:

  x聚集=无

  comm . gatherev(x _ local,[xGathered,(1,1,9),(0,1,2),MPI .双倍])

  print( process str(rank) has str(xGathered))

  该代码执行后会先打印结果:

  分散

  进程0有[0。]

  聚集然后进入堵塞大致10秒时间,由此可以看到不进行通信障碍操作的通信散射v是非堵塞的,秩==0没有等待其他进程完全接收数据便向下执行了,但是从运行结果上我们可以看到收集操作通信收集v是堵塞的,也正因此等级==0进程会在此处进入堵塞10秒的状态。

  上面代码的后续操作中有一个阻塞操作,所以没有comm.Barrier操作也不会有问题。但是,对于MPI中的非阻塞操作,为了以防万一,同步comm.Barrier操作更安全。

  代码运行命令也是:

  mpiexec -np 3 python x.py

  ==================================================

  上面的代码实际上将进程的数量硬编码到代码中。如果它运行其他值而不是-np 3,就会报错,这种编码方式是不合适的。因此,需要在不硬编码进程数量的情况下实现良好的计算负载。给自己一份工作:

  计算负载平衡的代码:

  #为了获得正确的性能,运行无缓冲的3个进程:

  # mpiexec-n 3 python 26 scratch . py-u

  进口数量

  从mpi4py导入mpi

  comm=MPI。通信_世界

  rank=comm.Get_rank()

  size=comm.Get_size()

  n=10000

  如果秩==0:

  x_global=numpy.linspace(0,100,n)

  否则:

  x_global=无

  n_local=numpy.zeros(size,dtype=numpy.int32)

  n_local[:]=n //size

  如果n %大小!=0:

  n_local[-(n%size):]=1

  begin_local=numpy.zeros(大小)

  对于范围内的I(1,大小):

  开始_本地[i]=开始_本地[i-1]n _本地[I-1]

  x _ local=numpy . zeros(n _ local[rank])

  #如果排位!=0:

  #导入时间

  #时间.睡眠(5)

  如果秩==0:

  打印(“散布”)

  comm.Scatterv([x_global,n_local,begin_local,MPI。DOUBLE],x_local)

  print( process str(rank) has str(x _ local[:5]))

  通信障碍()

  如果秩==0:

  打印(“收集”)

  xGathered=numpy.zeros(n)

  否则:

  x聚集=无

  comm . gatherev(x _ local,[xGathered,n_local,begin_local,MPI。双倍])

  print( process str(rank) has str(x collected))运行命令:

  mpiexec -np 23 python x.py

  可以看出,改进后的代码并没有将正在运行的进程总数硬编码到代码中,而是可以根据实际需要平衡进程总数在任意数值下的计算负荷。改进后的代码根据正在运行的进程总数将计算服务平均分配给所有计算进程。

  ================================================

  除非另有说明,以上代码运行命令为:

  mpiexec -np 8 python x.py

  转载请联系作者授权,否则将追究法律责任。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: