Python并行编程(十三):进程池和mpi4py模块

论坛 期权论坛 脚本     
已经匿名di用户   2022-5-29 19:31   1074   0

1、基本概念

多进程库提供了Pool类来实现简单的多进程任务。Pool类有以下方法:

- apply():直到得到结果之前一直阻塞。

- apply_async():这是apply()方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。

- map():这是内置的map函数的并行版本,在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。

- map_async():这是map的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时,会自动调用回调函数,除非调用失败。回调函数应该立即完成,否则,持有result的进程将被阻塞。

2、测试用例

创建四个进程池,然后使用map方法进行一个简单的计算。

import multiprocessing

def function_square(data):
    result = data * data
    return result

if __name__ == "__main__":
    inputs = list(range(100))
    pool = multiprocessing.Pool(processes=4)
    pool_outputs = pool.map(function_square, inputs)
    pool.close()
    pool.join()
    print("pool: ", pool_outputs)

pool.map方法将一些独立的任务提交给进程池。pool.map和内置map的执行结果相同,但pool.map是通过多个并行进程计算的。

3、mpi4py模块

Python提供了很多MPI模块写并行程序。其中mpi4py在MPI-1/2顶层构建,提供了面向对象的接口,紧跟C++绑定的MPI-2。MPI是C语言用户可以无需学习新的接口就可以使用这个库。

此模块包含的主要的应用:

- 点对点通讯

- 集体通讯

- 拓扑

4、安装mpi4py

安装mpich:https://www.microsoft.com/en-us/download/confirmation.aspx?id=56727

下载并安装msmpisetup.exe

安装完成后安装目录如下:

将bin目录添加到系统环境中:

用cmd输入并显示如下即为安装成功

安装mpi4py

pip install mpi4py

MPI测试用例

from mpi4py import MPI

def mpi_test(rank):
    print("I am rank %s" %rank)


if __name__ == "__main__":

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    mpi_test(rank)
    print("Hello world from process", rank)

使用mpi运行文件

在MPI中,并行程序中不同进程用一个非负整数来区别,如果我们有P个进程,那么rank会从0到P-1分配。

MPI拿到rank的函数如下:rank = comm.Get_rank()

这个函数返回调用它的进程的rank,comm叫做交流者,用于区别不同的进程集合:comm = MPI.COMM_WORLD

5、MPI点对点通讯

MPI提供的最实用的一个特性是点对点通讯。两个不同的进程之间可以通过点对点通讯交换数据:一个进程是接收者,一个进程是发送者。

Python的mpi4py通过下面两个函数提供了点对点通讯功能:

- Comm.Send(data, process_destination):通过它在交流组中的排名来区分发送给不同进程的数据。

- Comm.Recv(process_source):接收来自源进程的数据,也是通过在交流组中的排名来分分的。

Comm变量表示交流着,定义了可以互相通讯的进程组:

comm = MKPI.COMM_WORLD

交换信息测试用例:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("My rank is :",rank)

if rank == 0:
    data = 10000000
    destination_process = 4
    comm.send(data, dest=destination_process)
    print("sending data %s to process %d" %(data, destination_process))

if rank == 1:
    destination_process = 8
    data = "hello,I am rank 1"

    comm.send(data, dest=destination_process)
    print("sending data %s to process %d" %(data, destination_process))

if rank == 4:
    data = comm.recv(source=0)
    print("data received is = %s" %data)

if rank == 8:
    data1 = comm.recv(source=1)
    print("data received is = %s" %data1)

运行结果:

通过mpiexec -n 9运行9个互相通讯的进程,使用rank的值来区分每个进程。

整个过程分为两部分,发送者发送数据,接收者接收数据,二者必须都指定发送方/接收方,source=为指定发送者。如果有发送的数据没有被接收,程序会阻塞。

comm.send()和comm.recv()函数都是阻塞的函数,他们会一直阻塞调用者,直到数据使用完成,同时在MPI中,有两种方式发送和接收数据:

- buffer模式

- 同步模式

在buffer模式中,只要需要发送的数据被拷贝到buffer中,执行权就会交回到主程序,此时数据并非已经发送/接收完成。在同步模式中,只有函数真正的结束发送/接收任务之后才会返回。

6、避免死锁

mpi4py没有提供特定的功能来解决这种情况,但是提供了一些程序员必须遵守的规则来避免死锁的问题。

出现死锁的情况:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is :",rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5
    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)

    print("sending data %s to process %d" %(data_send, destination_process))
    print("data received is = %s" %data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1
    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)
    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

运行结果:

进程1和进程5产生阻塞,程序阻塞。

此时两个进程都在等待对方,发生阻塞,因为recv和send都是阻塞的,两个函数都先使用的recv,所以调用者都在等待他们完成。所以讲上述代码改为如下即可解决阻塞:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is :",rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5
    comm.send(data_send, dest=destination_process)
    data_received = comm.recv(source=source_process)

    print("sending data %s to process %d" %(data_send, destination_process))
    print("data received is = %s" %data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1
    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)
    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

将其中一个函数的recv和send顺序调换。

运行结果:

也可通过Sendrecv函数解决,代码如下:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is :",rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5
    # comm.send(data_send, dest=destination_process)
    # data_received = comm.recv(source=source_process)
    data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)

    print("sending data %s to process %d" %(data_send, destination_process))
    print("data received is = %s" %data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1
    # data_received = comm.recv(source=source_process)
    # comm.send(data_send, dest=destination_process)
    data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)
    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

运行结果:

7、集体通讯:Broadcast

在并行代码的开发中,会经常需要在多个进程间共享某个变量运行时的值,或操作多个进程提供的变量。MPI库提供了在多个进程之间交换信息的方法,将所有进程变成通讯者的这种方法叫做集体交流。因此,一个集体交流通常是2个以上的进程,也可以称为广播——一个进程将消息发送给其他进程。mpi4py模块通过以下方式提供广播的功能:

buf = comm.bcast(data_to_share, rank_of_root_process)

这个函数将root消息中包含的信息发送给属于comm通讯组其他的进程,每个进程必须通过相同的root和comm来调用它。

测试代码:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    variable_to_share = 100
else:
    variable_to_share = None

variable_to_share = comm.bcast(variable_to_share, root=0)
print("process = %d  variable shared = %d" %(rank, variable_to_share))

运行结果:

rank等于0的root进程初始化了一个变量,variable_to_share,值为100,然后声明了一个广播variable_to_share = comm.bcast(variable_to_share, root=0)

这个变量将通过通讯组发送给其他进程。

集体通讯允许组中的多个进程同时进行数据交流。在mpi4py模块中,只提供了阻塞版本的集体通讯(阻塞调用者,直到缓存中的数据全部安全发送。)

广泛应用的集体通讯应该是:

- 组中的进程提供通讯的屏障

- 通讯方式包括:

- 将一个进程的数据广播到组中其他进程中

- 从其他进程收集数据发给一个进程

- 从一个进程散播数据到其他进程中

- 减少操作

8、集体通讯:Scatter

scatter函数和广播很像,但是不同的是comm.bcast将相同的数据发送给所有在监听的进程,comm.scatter可以将数据放在数据中,发送给不同的进程。

comm.scatter函数接收一个array,根据进程的rank将其中的元素发给不同的进程,第一个元素发送给进程0,第二个元素发给进程1,以此类推。

测试用例:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# array_to_share = ["a","b","c","d","e","f","g","h","i","j"]
if rank == 0:
    array_to_share = [0,1,2,3,4,5,6,7,8,9]
else:
    array_to_share = None

recvbuf = comm.scatter(array_to_share, root=0)
print("Process = %d  recvbuf = %s" %(rank, recvbuf))

执行结果:

注意:列表中的元素个数,需要个进程保持一致。否则会出现如下错误。

9、集体通讯:gather

gather函数基本上是反向的scatter,即收集所有进程发送到root进程数据。方法如下:

recvbuf = comm.gather(sendbuf, rank_of_root_process)

sendbuf是要发送的数据,rank_of_root_process代表要接收数据的进程。

测试用例:

from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
# print(size)
rank = comm.Get_rank()
data = "process %s" %rank
# print("start %s"%data)
data = comm.gather(data, root=0)
# print(data)
if rank == 0:
    print("rank = %s receiving data to other process" %rank)
    for i in range(1, size):
        #data[i] = (i+1) ** 2
        value = data[i]
        print("process %s receiving %s from process %s" %(rank, value, i))
    # print(data)

执行结果:

10、使用Alltoall通讯

Alltoall集体通讯结合了scatter和gather的功能。在mpi4py中,有以下三类的Alltoall集体通讯。

- comm.Alltoall(sendbuf, recvbuf);

- comm.Alltoallv(sendbuf, recvbuf);

- comm.Alltoallw(sendbuf, recvbuf);

Alltoall测试用例:

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

a_size = 1

# print("numpy arange: %s" %numpy.arange(size, dtype=int))
senddata = (rank+1)*numpy.arange(size, dtype=int)
recvdata = numpy.empty(size * a_size, dtype=int)
print("senddata is %s , recvdata is %s" %(senddata, recvdata))
# print("Recvdata is %s: , \n numpy.empty is %s" %(recvdata, numpy.empty(size * a_size, dtype=int)))

comm.Alltoall(senddata, recvdata)
print("process %s sending %s, receiving %s" %(rank, senddata, recvdata))

运行结果:

comm.alltoall方法将task j的sendbuf的第j个对象拷贝到task i中,recvbuf的第j个对象,一一对应。发送过程如图:

可以将左右两个方格看做xy轴,结果一一对应,如左图的(0,0)对应的值为0,其对应的有图的值为右图的(0,0)也为0。左图的3,4对应的值为16,右图(4,3)也为16。

P0包含的数据[0 1 2 3 4],它将值0赋值给自己,1传给进程P1,2传给进程P2,3传给进程P3,以此类推。

相同的P1的数据为[0 2 4 6 8] , 它将0传给P0,2传给P1,4传给P2,以此类推。

All-to-all定制通讯也叫全部交换,这种操作经常用于各种并发算法中,比如快速傅里叶变换,矩阵变换,样本排序以及一些数据库的 Join 操作。

11、简化操作

同comm.gather一样,comm.reduce接收一个数组,每一个元素是一个进程的输入,然后返回一个数组,每一个元素是进程的输出,返回给root进程。输出的元素包含了简化的结果。

简化定义如下:comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)

这里需要注意的是,参数op和comm.gather不同,它代表你想应用在数据上的操作,mpi4py模块代表定义了一系列的简化操作,包括:

- MPI.MAX:返回最大的元素

- MPI.MIN:返回最小的元素

- MPI.SUM:对所有的元素相加

- MPI.PROD:对所有元素相乘

- MPI.LAND:对所有元素进行逻辑操作

- MPI.MAXLOC:返回最大值,以及拥有它的进程

- MPI.MINLOC:返回最小值,以及拥有它的进程

测试用例:

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.size
rank = comm.rank
array_size = 3
recvdata = np.zeros(array_size, dtype=np.int)
senddata = (rank+1)*np.arange(size, dtype=np.int)
print("+++++++++++++%s+++++++++++++%s++++++++++++" %(recvdata, senddata))
print("Process %s sending %s" %(rank, senddata))
comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)
print("on task %s, after Reduce: data = %s" %(rank, recvdata))

执行结果:

MPI.SUM为求和操作,过程如下:

简化操作将每个task的第i个元素相加,然后放回到P0进程(root进程)的第i个元素中。

转载于:https://www.cnblogs.com/dukuan/p/9811057.html

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:81
帖子:4969
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP