1、基本概念
多进程库提供了Pool类来实现简单的多进程任务。Pool类有以下方法:
- apply():直到得到结果之前一直阻塞。
- apply_async():这是apply()方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。
- map():这是内置的map函数的并行版本,在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。
- map_async():这是map的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时,会自动调用回调函数,除非调用失败。回调函数应该立即完成,否则,持有result的进程将被阻塞。
2、测试用例
创建四个进程池,然后使用map方法进行一个简单的计算。
import multiprocessing def function_square(data): result = data * data return result if __name__ == "__main__": inputs = list(range(100)) pool = multiprocessing.Pool(processes=4) pool_outputs = pool.map(function_square, inputs) pool.close() pool.join() print("pool: ", pool_outputs)
pool.map方法将一些独立的任务提交给进程池。pool.map和内置map的执行结果相同,但pool.map是通过多个并行进程计算的。
3、mpi4py模块
Python提供了很多MPI模块写并行程序。其中mpi4py在MPI-1/2顶层构建,提供了面向对象的接口,紧跟C++绑定的MPI-2。MPI是C语言用户可以无需学习新的接口就可以使用这个库。
此模块包含的主要的应用:
- 点对点通讯
- 集体通讯
- 拓扑
4、安装mpi4py
安装mpich:https://www.microsoft.com/en-us/download/confirmation.aspx?id=56727
下载并安装msmpisetup.exe
安装完成后安装目录如下:
将bin目录添加到系统环境中:
用cmd输入并显示如下即为安装成功
安装mpi4py
pip install mpi4py
MPI测试用例
from mpi4py import MPI def mpi_test(rank): print("I am rank %s" %rank) if __name__ == "__main__": comm = MPI.COMM_WORLD rank = comm.Get_rank() mpi_test(rank) print("Hello world from process", rank)
使用mpi运行文件
在MPI中,并行程序中不同进程用一个非负整数来区别,如果我们有P个进程,那么rank会从0到P-1分配。
MPI拿到rank的函数如下:rank = comm.Get_rank()
这个函数返回调用它的进程的rank,comm叫做交流者,用于区别不同的进程集合:comm = MPI.COMM_WORLD
5、MPI点对点通讯
MPI提供的最实用的一个特性是点对点通讯。两个不同的进程之间可以通过点对点通讯交换数据:一个进程是接收者,一个进程是发送者。
Python的mpi4py通过下面两个函数提供了点对点通讯功能:
- Comm.Send(data, process_destination):通过它在交流组中的排名来区分发送给不同进程的数据。
- Comm.Recv(process_source):接收来自源进程的数据,也是通过在交流组中的排名来分分的。
Comm变量表示交流着,定义了可以互相通讯的进程组:
comm = MKPI.COMM_WORLD
交换信息测试用例:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("My rank is :",rank) if rank == 0: data = 10000000 destination_process = 4 comm.send(data, dest=destination_process) print("sending data %s to process %d" %(data, destination_process)) if rank == 1: destination_process = 8 data = "hello,I am rank 1" comm.send(data, dest=destination_process) print("sending data %s to process %d" %(data, destination_process)) if rank == 4: data = comm.recv(source=0) print("data received is = %s" %data) if rank == 8: data1 = comm.recv(source=1) print("data received is = %s" %data1)
运行结果:
通过mpiexec -n 9运行9个互相通讯的进程,使用rank的值来区分每个进程。
整个过程分为两部分,发送者发送数据,接收者接收数据,二者必须都指定发送方/接收方,source=为指定发送者。如果有发送的数据没有被接收,程序会阻塞。
comm.send()和comm.recv()函数都是阻塞的函数,他们会一直阻塞调用者,直到数据使用完成,同时在MPI中,有两种方式发送和接收数据:
- buffer模式
- 同步模式
在buffer模式中,只要需要发送的数据被拷贝到buffer中,执行权就会交回到主程序,此时数据并非已经发送/接收完成。在同步模式中,只有函数真正的结束发送/接收任务之后才会返回。
6、避免死锁
mpi4py没有提供特定的功能来解决这种情况,但是提供了一些程序员必须遵守的规则来避免死锁的问题。
出现死锁的情况:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("my rank is :",rank) if rank == 1: data_send = "a" destination_process = 5 source_process = 5 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received) if rank == 5: data_send = "b" destination_process = 1 source_process = 1 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)
运行结果:
进程1和进程5产生阻塞,程序阻塞。
此时两个进程都在等待对方,发生阻塞,因为recv和send都是阻塞的,两个函数都先使用的recv,所以调用者都在等待他们完成。所以讲上述代码改为如下即可解决阻塞:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("my rank is :",rank) if rank == 1: data_send = "a" destination_process = 5 source_process = 5 comm.send(data_send, dest=destination_process) data_received = comm.recv(source=source_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received) if rank == 5: data_send = "b" destination_process = 1 source_process = 1 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)
将其中一个函数的recv和send顺序调换。
运行结果:
也可通过Sendrecv函数解决,代码如下:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("my rank is :",rank) if rank == 1: data_send = "a" destination_process = 5 source_process = 5 # comm.send(data_send, dest=destination_process) # data_received = comm.recv(source=source_process) data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received) if rank == 5: data_send = "b" destination_process = 1 source_process = 1 # data_received = comm.recv(source=source_process) # comm.send(data_send, dest=destination_process) data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)
运行结果:
7、集体通讯:Broadcast
在并行代码的开发中,会经常需要在多个进程间共享某个变量运行时的值,或操作多个进程提供的变量。MPI库提供了在多个进程之间交换信息的方法,将所有进程变成通讯者的这种方法叫做集体交流。因此,一个集体交流通常是2个以上的进程,也可以称为广播——一个进程将消息发送给其他进程。mpi4py模块通过以下方式提供广播的功能:
buf = comm.bcast(data_to_share, rank_of_root_process)
这个函数将root消息中包含的信息发送给属于comm通讯组其他的进程,每个进程必须通过相同的root和comm来调用它。
测试代码:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: variable_to_share = 100 else: variable_to_share = None variable_to_share = comm.bcast(variable_to_share, root=0) print("process = %d variable shared = %d" %(rank, variable_to_share))
运行结果:
rank等于0的root进程初始化了一个变量,variable_to_share,值为100,然后声明了一个广播variable_to_share = comm.bcast(variable_to_share, root=0)
这个变量将通过通讯组发送给其他进程。
集体通讯允许组中的多个进程同时进行数据交流。在mpi4py模块中,只提供了阻塞版本的集体通讯(阻塞调用者,直到缓存中的数据全部安全发送。)
广泛应用的集体通讯应该是:
- 组中的进程提供通讯的屏障
- 通讯方式包括:
- 将一个进程的数据广播到组中其他进程中
- 从其他进程收集数据发给一个进程
- 从一个进程散播数据到其他进程中
- 减少操作
8、集体通讯:Scatter
scatter函数和广播很像,但是不同的是comm.bcast将相同的数据发送给所有在监听的进程,comm.scatter可以将数据放在数据中,发送给不同的进程。
comm.scatter函数接收一个array,根据进程的rank将其中的元素发给不同的进程,第一个元素发送给进程0,第二个元素发给进程1,以此类推。
测试用例:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() # array_to_share = ["a","b","c","d","e","f","g","h","i","j"] if rank == 0: array_to_share = [0,1,2,3,4,5,6,7,8,9] else: array_to_share = None recvbuf = comm.scatter(array_to_share, root=0) print("Process = %d recvbuf = %s" %(rank, recvbuf))
执行结果:
注意:列表中的元素个数,需要个进程保持一致。否则会出现如下错误。
9、集体通讯:gather
gather函数基本上是反向的scatter,即收集所有进程发送到root进程数据。方法如下:
recvbuf = comm.gather(sendbuf, rank_of_root_process)
sendbuf是要发送的数据,rank_of_root_process代表要接收数据的进程。
测试用例:
from mpi4py import MPI comm = MPI.COMM_WORLD size = comm.Get_size() # print(size) rank = comm.Get_rank() data = "process %s" %rank # print("start %s"%data) data = comm.gather(data, root=0) # print(data) if rank == 0: print("rank = %s receiving data to other process" %rank) for i in range(1, size): #data[i] = (i+1) ** 2 value = data[i] print("process %s receiving %s from process %s" %(rank, value, i)) # print(data)
执行结果:
10、使用Alltoall通讯
Alltoall集体通讯结合了scatter和gather的功能。在mpi4py中,有以下三类的Alltoall集体通讯。
- comm.Alltoall(sendbuf, recvbuf);
- comm.Alltoallv(sendbuf, recvbuf);
- comm.Alltoallw(sendbuf, recvbuf);
Alltoall测试用例:
from mpi4py import MPI import numpy comm = MPI.COMM_WORLD size = comm.Get_size() rank = comm.Get_rank() a_size = 1 # print("numpy arange: %s" %numpy.arange(size, dtype=int)) senddata = (rank+1)*numpy.arange(size, dtype=int) recvdata = numpy.empty(size * a_size, dtype=int) print("senddata is %s , recvdata is %s" %(senddata, recvdata)) # print("Recvdata is %s: , \n numpy.empty is %s" %(recvdata, numpy.empty(size * a_size, dtype=int))) comm.Alltoall(senddata, recvdata) print("process %s sending %s, receiving %s" %(rank, senddata, recvdata))
运行结果:
comm.alltoall方法将task j的sendbuf的第j个对象拷贝到task i中,recvbuf的第j个对象,一一对应。发送过程如图:
可以将左右两个方格看做xy轴,结果一一对应,如左图的(0,0)对应的值为0,其对应的有图的值为右图的(0,0)也为0。左图的3,4对应的值为16,右图(4,3)也为16。
P0包含的数据[0 1 2 3 4],它将值0赋值给自己,1传给进程P1,2传给进程P2,3传给进程P3,以此类推。
相同的P1的数据为[0 2 4 6 8] , 它将0传给P0,2传给P1,4传给P2,以此类推。
All-to-all定制通讯也叫全部交换,这种操作经常用于各种并发算法中,比如快速傅里叶变换,矩阵变换,样本排序以及一些数据库的 Join 操作。
11、简化操作
同comm.gather一样,comm.reduce接收一个数组,每一个元素是一个进程的输入,然后返回一个数组,每一个元素是进程的输出,返回给root进程。输出的元素包含了简化的结果。
简化定义如下:comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)
这里需要注意的是,参数op和comm.gather不同,它代表你想应用在数据上的操作,mpi4py模块代表定义了一系列的简化操作,包括:
- MPI.MAX:返回最大的元素
- MPI.MIN:返回最小的元素
- MPI.SUM:对所有的元素相加
- MPI.PROD:对所有元素相乘
- MPI.LAND:对所有元素进行逻辑操作
- MPI.MAXLOC:返回最大值,以及拥有它的进程
- MPI.MINLOC:返回最小值,以及拥有它的进程
测试用例:
import numpy as np from mpi4py import MPI comm = MPI.COMM_WORLD size = comm.size rank = comm.rank array_size = 3 recvdata = np.zeros(array_size, dtype=np.int) senddata = (rank+1)*np.arange(size, dtype=np.int) print("+++++++++++++%s+++++++++++++%s++++++++++++" %(recvdata, senddata)) print("Process %s sending %s" %(rank, senddata)) comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM) print("on task %s, after Reduce: data = %s" %(rank, recvdata))
执行结果:
MPI.SUM为求和操作,过程如下:
简化操作将每个task的第i个元素相加,然后放回到P0进程(root进程)的第i个元素中。