3 Multiprocessing及mpi4py
在本章中,我们将开始并行编程的工作。事实上,在 Python 中,只有进程可以同时运行,并行计算才能带来真正的好处。在 Python 中,有两种主要的方法可以在并行编程中实现进程:标准库中的多进程模块和 mpi4py 库,后者也为 Python 语言扩展了 MPI 协议。多进程实现了共享内存编程范式,程序中的进程可以访问一个公共内存区域。与线程一样,这种方法会产生竞赛条件和同步问题。为此,我们将引入由同一多进程模块提供的两个进程间通信通道: 队列(Queue)和管道(Pipe)。这将允许并行进程完全同步地交换数据,而不会出现共享内存的问题。另一方面,mpi4py 库基于消息传递模式。在这种模式下,没有共享资源,进程之间的所有通信都是通过进程交换进行的。
主要内容:
多进程模块
作为类和子类的进程
通信渠道–队列和管道
进程池
进程池执行器
Mpi4py 库
点对点通信
集体通信
拓扑结构
3.1 进程和Multiprocessing模块
在 Python 中,并行编程的真正主角是进程,只有进程才能真正同时执行代码。这是因为,正如我们在前面章节中所看到的,Python 中的线程不能并行操作,我们最多只能用它们实现并发行为。
因此,在 Python 中进程的编程是非常重要的,以至于我们在标准库中有一个完全专用于进程的模块:multiprocessing。
该模块提供了在程序中创建和管理进程的所有可能操作,其方式与线程模块处理线程的方式非常相似。实现过程中使用的指令和构造几乎完全相同:
Process() 构造函数允许创建进程。
调用 start() 方法可以启动进程活动。
调用 join() 方法会导致程序(主进程)的执行等待所有并行启动的进程执行完毕。
进程的生命周期基于三种状态:
就绪(Ready)
运行中(Running)
等待(Waiting)
然后,多进程模块按以下方式构建并行编程。主进程是开始执行程序的进程。在主进程中,将使用 Process() 构造函数定义多个进程。此时,刚刚定义的进程处于就绪状态。然后在某一时刻,程序的某一部分开始并行执行。通过调用每个进程的 start() 方法,所有已定义的进程(子进程)将一起被激活。主进程(在这种情况下也是父进程)将继续异步执行,而不会等待子进程的执行:
如果我们需要一种同步机制,并希望主进程等待其子进程的执行结果,我们将为每个进程调用 join() 方法:
就像处理线程模块的线程一样,我们用进程来编写代码。正如您所看到的,线程和进程的 API 设计具有很好的一致性。
并行激活的每个进程都将执行目标函数指定的某些操作。这些进程可以全部执行相同的操作,也可以分别执行不同的操作。事实上,每个进程的目标函数都是通过在构造函数中传递其名称作为目标参数来指定的。
为了更好地理解刚才解释的概念,让我们以下面的代码为例:
import multiprocessing
import time
def function(i):
print (“start Process %i” %i)
time.sleep(2)
print (“end Process %i” %i)
return
if name == ‘main‘:
p1 = multiprocessing.Process(target=function, args=(1,))
p2 = multiprocessing.Process(target=function, args=(2,))
p3 = multiprocessing.Process(target=function, args=(3,))
p4 = multiprocessing.Process(target=function, args=(4,))
p5 = multiprocessing.Process(target=function, args=(5,))
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p1.join()
p2.join()
p3.join()
p4.join()
p5.join()
print(“END Program”)
从代码中我们可以看到,通过 Process() 构造函数并行定义了五个进程。它们都将执行相同的目标函数,为简单起见,我们在此将其称为 function。如果函数需要一些参数,也可以将它们作为一个元组传递给 Process() 构造函数的 args 参数。在代码中,我们需要传递一个数字来标识函数中运行的进程。所有这些进程都将通过 start() 方法激活,然后通过 join() 方法与主进程同步。
运行代码后,我们将得到类似下面的结果:
start Process 1
start Process 2
start Process 3
start Process 4
start Process 5
end Process 1
end Process 2
end Process 3
end Process 4
end Process 5
END Program
我们在前一个案例中采用的并行代码实现方式非常易读,所有指令都单独明确地表达出来。由于需要并行使用五个进程,我们为构造函数定义了五行,用 start 方法激活它们定义了五行,用 join() 方法使它们与主进程同步定义了五行。但是,如果进程的数量增加到更多,用这种方法执行代码就会很麻烦。
有许多更有效的结构,它们利用迭代和其他机制来概括前面的所有步骤。修改前面代码的例子如下:
import multiprocessing
import time
def function(i):
print (“start Process %i” %i)
time.sleep(2)
print (“end Process %i” %i)
return
if name == ‘main‘:
processes = []
n_procs = 5
for i in range(n_procs):
p = multiprocessing.Process(target=function, args=(i,))
processes.append(p)
p.start()
for i in range(n_procs):
processes[i].join()
print(“END Program”)
3.1.1 使用进程 ID
在前面的例子中,我们在 Process() 构造函数中使用了参数通道,通过迭代器 i 来传递进程编号。还有另一种方法可以识别运行中的进程,那就是进程 ID(PID)。这是一个非常简单的操作,我们导入标准库中的 os 模块,该模块提供了 getpid() 函数,通过该函数我们可以获得每个运行进程的 PID。
我们对之前的代码做如下修改,用进程 ID 代替数字,如下代码所示:
import multiprocessing
import os
import time
def function():
pid = os.getpid()
print (“start Process %s” %pid)
time.sleep(2)
print (“end Process %s” %pid)
return
if name == ‘main‘:
processes = []
n_procs = 5
for i in range(n_procs):
p = multiprocessing.Process(target=function)
processes.append(p)
p.start()
for i in range(n_procs):
processes[i].join()
print(“END Program”)
可以看到,Process() 构造函数不再需要使用 args 作为参数,因为函数本身会处理它。事实上,在该构造函数中调用了 getpid()函数,它将返回执行该函数的进程的 PID。我们将从同一个函数中获得五个不同的 PID,而无需使用从主进程传递的参数。
如果我们运行这段代码,会得到类似下面的结果:
start Process 31268
start Process 78792
start Process 99072
start Process 26564
start Process 57956
end Process 31268
end Process 78792
end Process 99072
end Process 26564
end Process 57956
END Program
从结果中我们可以看到,这次我们得到的不是累进数字,而是可以识别并行运行的不同进程的 PID。这些 PID 与操作系统识别的 PID 相同。因此,在 Python 解释器的范围之外,也可以通过其他工具或应用程序来监控资源消耗或管理活动进程。
3.1.2 进程池
共同管理多个进程的编程模式的进一步发展是使用进程池,在 Python 中用 multiprocessing.Pool 类表示。
进程池是负责管理一定数量进程的对象。通过它,你可以控制进程从创建到使用的状态,以及是否应该暂停其中一些进程以节省计算资源。因此,multiprocessing.Pool 不过是利用一组进程临时执行特定任务的接口,而无需指定有多少个进程和哪些进程必须执行该任务。这样,我们就可以大大简化代码,使其更具可读性。
让我们将之前的代码转换为 multiprocessing.Pool 代码:
import multiprocessing
import time
def function(i):
process = multiprocessing.current_process()
print (“start Process %i(pid:%s)” %(i,process.pid))
time.sleep(2)
print (“end Process %i(pid:%s)” %(i,process.pid))
return
if name == ‘main‘:
pool = multiprocessing.Pool()
print(“Processes started: %s” %pool._processes)
for i in range(pool._processes):
results = pool.apply(function, args=(i,))
pool.close()
print(“END Program”)
pool = multiprocessing.pool.Pool()创建进程池,使用与系统逻辑 CPU 内核数量相匹配的工作进程。在代码中可通过 pool._processes 访问该数量。然后使用 pool.apply(),让进程池执行由 function() 函数表示的给定任务,并将通过 args 生成的进程数量作为参数传递给进程池。代码中要求的所有任务执行完毕后,将使用 pool.close() 函数关闭池。
至于 function() 函数,我们添加了进程的 PID,以便能明确识别。要获取这个值,首先要使用 current_process()获取正在运行的进程,然后在此基础上调用 pid 属性。
运行刚才编写的代码,我们将得到类似下面的结果:
Processes started: 16
start Process 0(pid:1668)
end Process 0(pid:1668)
start Process 1(pid:78336)
end Process 1(pid:78336)
start Process 2(pid:100832)
end Process 2(pid:100832)
start Process 3(pid:104912)
end Process 3(pid:104912)
start Process 4(pid:50536)
end Process 4(pid:50536)
start Process 5(pid:70524)
end Process 5(pid:70524)
start Process 6(pid:54460)
end Process 6(pid:54460)
start Process 7(pid:78740)
end Process 7(pid:78740)
start Process 8(pid:26464)
end Process 8(pid:26464)
start Process 9(pid:10632)
end Process 9(pid:10632)
start Process 10(pid:106384)
end Process 10(pid:106384)
start Process 11(pid:94768)
end Process 11(pid:94768)
start Process 12(pid:78340)
end Process 12(pid:78340)
start Process 13(pid:75256)
end Process 13(pid:75256)
start Process 14(pid:106744)
end Process 14(pid:106744)
start Process 15(pid:46304)
end Process 15(pid:46304)
END Program
从结果中我们可以看到,前 12 个 Worker 是根据执行代码的计算机的内核数创建的(不同计算机的内核数可能不同)。然后,每个 Worker 按照迭代规定的步骤执行调用的函数。从结果可以看出,每个任务都是按顺序执行的。稍后我们将看到如何使用 map() 函数并行运行进程池,从而充分利用进程池。
为了更好地理解进程池的工作原理,让我们做一个小小的改动。在调用 Pool() 时,也可以确定池内进程的固定数量,而与 CPU 内核无关。在构造函数中,将使用进程参数定义成为池一部分的进程数:
pool = multiprocessing.Pool(processes=4)
我们对之前的代码进行了适当修改,替换了 Pool() 的定义,并在 for 循环中保留了对函数的十二次调用。在 function() 中,我们用更正确的术语 Task 代替了 Process:
import multiprocessing
import time
def function(i):
process = multiprocessing.current_process()
print (“start Task %i(pid:%s)” %(i,process.pid))
time.sleep(2)
print (“end Task %i(pid:%s)” %(i,process.pid))
return
if name == ‘main‘:
pool = multiprocessing.Pool(processes=4)
print(“Processes started: %s” %pool._processes)
for i in range(12):
results = pool.apply(function, args=(i,))
pool.close()
print(“END Program”)
因此,在这种情况下,我们将有 12 个任务,分别由 4 个进程执行。执行新修改的代码后,我们将得到类似下面的结果:
start Task 0(pid:96776)
end Task 0(pid:96776)
start Task 1(pid:25644)
end Task 1(pid:25644)
start Task 2(pid:21228)
end Task 2(pid:21228)
start Task 3(pid:77736)
end Task 3(pid:77736)
start Task 4(pid:96776)
end Task 4(pid:96776)
start Task 5(pid:25644)
end Task 5(pid:25644)
start Task 6(pid:21228)
end Task 6(pid:21228)
start Task 7(pid:77736)
end Task 7(pid:77736)
start Task 8(pid:96776)
end Task 8(pid:96776)
start Task 9(pid:25644)
end Task 9(pid:25644)
start Task 10(pid:21228)
end Task 10(pid:21228)
start Task 11(pid:77736)
end Task 11(pid:77736)
END Program
从结果中我们可以看到,所有 12 项任务都是按顺序执行的,但这次是由 4 个进程(工作者)轮流激活和停用自己来执行所有任务。
3.1.3 将进程定义为子类
到目前为止,我们已经了解了如何通过 Process() 构造函数在代码中定义并行工作的进程。然后再为其分配一个包含并行执行代码的目标函数。实现并行进程方案的另一种方法是将后者定义为 Process() 的子类。在这些子类中,它们的功能将通过覆盖 init () 和 run() 方法来指定。在这种情况下,并行运行的代码将被插入类的 run() 方法中。
让我们以下面的代码为例,通过实际操作更好地理解这一概念:
from multiprocessing import Process
import time
import random
class ChildProcess(Process):
def init(self, count):
Process.init(self)
self.count = count
def run(self):
print (“start Process %s” %self.count)
time.sleep(2)
print (“end Process %s” %self.count)
if name == ‘main‘:
processes = []
n_procs = 5
for i in range(n_procs):
p = ChildProcess(i)
processes.append(p)
p.start()
for i in range(n_procs):
processes[i].join()
从代码中我们可以看到,我们定义了 Process() 的一个子类,并将其命名为 ChildProcess()。在该类中,我们有两个重载方法。在 init ()中,我们将定义要添加的子类的属性,因为在本例中,计数参数会传递给构造函数。在 run() 方法中,我们将插入目标函数中的代码。
运行代码后,我们将得到类似下面的结果:
start Process 0
start Process 1
start Process 2
start Process 3
start Process 4
end Process 0
end Process 1
end Process 2
end Process 3
end Process 4
3.2 进程间通信
进程与线程不同,一般不共享内存空间,因此需要一种不同的机制来相互通信和交换数据。然而,多进程模块基于共享内存模式,因此允许进程之间共享资源。由于这种做法会导致与线程相同的问题,即出现竞赛条件现象和进程间缺乏同步,因此强烈建议不要使用。多进程模块和线程模块一样,提供了用于进程同步的工具,如 semaphores、锁、事件等,但不建议采用这种方法。与线程并行处理一组进程不同,多进程模块提供的通信通道可用作安全和同步的数据交换机制。通信通道有两种不同的模式:
队列
管道
这些通道是通过多进程模块本身提供的 Queue 和 Pipe 类在代码中定义的。这两个类都配有 Queue() 和 Pipe() 构造函数,以及一系列管理进程间数据交换的方法和内部同步机制。一旦在主进程层定义了这些通信通道,它们就会成为以完美同步方式交换数据的绝佳工具。事实上,有了它们,各进程就能以安全和同步的方式发送和接收数据,而不会出现竞赛条件的风险。此外,如果发送的数据多于请求的数据,这些数据不会被覆盖,而是会累积到这些对象中,从而提供数据缓冲。
3.2.1 队列(Queue)
在 Python 中,也有属于队列模块的队列(我们已经在线程中使用过)。但实际上,它们的内部实现是不同的,multiprocessing.Queue 使用一种不同的数据传输机制,专门用于与进程一起工作。也就是说,它们使用的是进程专用的消息传递范式,避免使用同步机制来添加队列。
队列是一种实现先进先出(FIFO)队列的数据结构。进程可以通过 put() 方法向队列添加数据,也可以通过 get() 方法从队列接收数据。队列中会保持输入数据的顺序,然后按照输入顺序向请求数据的进程发送数据。
数据可以在队列中累积,既可以是简单的数字数据,也可以是复杂的对象。队列没有大小限制,因此不会出现数据溢出问题。不过,您可以定义一个容量限制,在 Queue() 构造函数的参数中传递一个 maxsize 值,代表所包含对象的最大数量:
queue = multiprocessing.Queue(maxsize=100)
不过,就队列的容量而言,在执行过程中有时需要知道队列中包含的对象的当前数量。为此,有一个 qsize() 方法可以返回这个数字:
size = queue.qsize()
不过,在管理队列时,如果想在程序流程中附加条件,那么知道队列是空还是满可能会很有用。empty() 方法在队列为空时返回值为 True 的布尔值,而 full() 方法在队列为满时返回值为 True:
if queue.empty():
…
if queue.full():
…
为了更好地理解队列的工作原理,我们可以使用消费者-生产者范式,这种范式在发送和接收数据时非常有用。生产者指的是任何会在执行过程中提供数据的进程,而消费者指的是任何需要这些数据才能完成执行的进程。因此,要使至少由两个进程创建的这一机制发挥作用,就必须进行数据交换。由于这些进程没有共同的内存区域,它们将通过队列交换数据。消费者将向队列发送生成的数据,而消费者则向生产者请求数据。消费者和生产者始终是异步的,除了通过队列进行的数据流外,它们通常不相互依赖。由于消费者和生产者的时间很少重合,在某些情况下,生产者会以更快的速度生产消费者数据,这些数据可以有序地累积在队列中,而不会被覆盖。这样,消费者就有足够的时间有序地使用这些产生的数据。
下面的代码实现了一个简单的生产者-消费者范例,它基于两个并行进程,利用队列交换数据:
from multiprocessing import Process, Pipe
import time
import random
class Consumer(Process):
def init(self, count, conn):
Process.init(self)
self.count = count
self.conn = conn
def run(self):
for i in range(self.count):
local = self.conn.recv()
time.sleep(2)
print(“consumer has used this: %s” %local)
class Producer(Process):
def init(self, count, conn):
Process.init(self)
self.count = count
self.conn = conn
def request(self):
time.sleep(1)
return random.randint(0,100)
def run(self):
for i in range(self.count):
local = self.request()
self.conn.send(local)
print(“producer has loaded this: %s” %local)
if name == ‘main‘:
recver, sender = Pipe()
count = 5
p1 = Producer(count, sender)
p2 = Consumer(count, recver)
p1.start()
p2.start()
p1.join()
p2.join()
recver.close()
sender.close()
运行该代码将产生类似下面的结果:
producer has loaded this: 83
producer has loaded this: 79
consumer has used this: 83
producer has loaded this: 99
producer has loaded this: 90
consumer has used this: 79
producer has loaded this: 90
consumer has used this: 99
consumer has used this: 90
consumer has used this: 90
我们可以看到,生产者进程生成的值被累积到队列中,随后被消费者进程完全同步地使用,不会丢失或覆盖任何值。
3.2.2 管道(Pipe)
另一个执行进程间通信通道功能的类是管道(Pipe)。管道实现了两个进程之间的双向通信,目的是在它们之间发送和接收数据。与队列类一样,管道类也有自己的 Pipe() 构造函数,在声明时会创建两个不同的 multiprocessing.connection.Connection 对象。因此,通常在声明 Pipe() 构造函数时,要将两个连接实例分开:
conn1, conn2 = multiprocessing.Pipe()
其中一个连接用于发送数据,另一个用于接收数据。默认情况下,第一个连接(conn1)只用于接收数据,而第二个连接(conn2)只用于发送数据。
除了双向模式外,管道还可以在双工模式下创建,在这种模式下,两个连接中的每一个都可以用来发送和接收数据。在这种情况下,我们必须将构造函数中传递的双工参数设置为 True:
conn1、conn2 = multiprocessing.Pipe(duplex=True)
在这两种模式下,进程都可以通过生成的两个 Connection 对象,通过 send() 方法发送数据,并通过 recv() 方法接收数据:
conn2.send(object)
object = conn1.recv()
至于队列,发送对象的大小没有限制,唯一的条件是它们可以被选中。发送时,它们将被选中,而接收时,它们将自动被取消选中。
与队列一样,管道类也提供了一系列方法,可用于管理在使用过程中创建的连接。例如,管道的状态可以由 poll() 函数控制。该函数返回一个布尔值: 如果连接中有数据等待进程接收,则返回 True。该方法在管理程序执行流的条件下非常有用:
if conn1.poll():
让我们通过下面的示例代码来了解一下刚才介绍的概念:
from multiprocessing import Process, Pipe
import time
import random
class Consumer(Process):
def init(self, count, conn):
Process.init(self)
self.count = count
self.conn = conn
def run(self):
for i in range(self.count):
local = self.conn.recv()
time.sleep(2)
print(“consumer has used this: %s” %local)
class Producer(Process):
def init(self, count, conn):
Process.init(self)
self.count = count
self.conn = conn
def request(self):
time.sleep(1)
return random.randint(0,100)
def run(self):
for i in range(self.count):
local = self.request()
self.conn.send(local)
print(“producer has loaded this: %s” %local)
if name == ‘main‘:
recver, sender = Pipe()
count = 5
p1 = Producer(count, sender)
p2 = Consumer(count, recver)
p1.start()
p2.start()
p1.join()
p2.join()
recver.close()
sender.close()
此外,在本例中,我们使用了生产者-消费者范式,并尽可能保留了之前使用队列的代码。这样做是为了帮助读者对两种模式进行适当的比较。运行代码后,您将得到类似下面的结果:
producer has loaded this: 84
producer has loaded this: 43
consumer has used this: 84
producer has loaded this: 49
producer has loaded this: 15
consumer has used this: 43
producer has loaded this: 27
consumer has used this: 49
consumer has used this: 15
consumer has used this: 27
在使用管道结束时,我们分别调用了两个连接的 close() 方法。为了释放管道使用的所有资源,这个操作非常重要。
管道最重要、也可能是最有局限性的一点是,它只能在两个端点之间工作,因此只能在两个进程之间工作。如果需要同时处理多个进程,则需要为每对进程建立一个管道。很明显,当并行工作的进程变多时,管道的管理就会出现问题。
3.2.3 管道与队列
我们刚才看到,作为通信通道,多进程模块提供了管道和队列两种解决方案。那么,在并行进程间交换数据时,应该选择哪一种呢?
乍一看,由于管道只能在两个进程之间有限地使用,队列似乎总是最好的解决方案。但事实并非总是如此。
管道是一个比队列简单得多的类。管道类的实现层次较低,正是这一特点使管道类在两个进程间交换数据时更高效、更快捷。因此,如果这种类型的连接可以在成对的进程之间使用,那么它比队列更可取。另一方面,如果数据交换是一致的,且参与交换的进程数量较多,则只能使用队列。
3.3 通过进程池映射函数
并行编程的另一个非常重要的方面是函数的映射。事实上,可以在多个并行进程之间扩展 Python 中 map() 的功能。当你想在一个可迭代对象上应用一个函数时,这个函数非常有用。事实上,map() 函数会将函数应用到对象的每个元素上,并返回一个大小相等的元素数组,每个元素上都有函数的结果。
为了更好地理解 map() 函数的工作原理,没有比一个直接的例子更好的了。让我们看看下面的代码:
import time
import math
import numpy as np
def func(value):
result = math.sqrt(value)
print(“The value %s and the elaboration is %s” %(value, result) )
return result
if name == ‘main‘:
data = np.array([10,3,6,1])
results = map(func, data)
for result in results:
print(“This is the result: %s” %result)
在代码中,我们选择了一个包含四个元素的 NumPy 数组,并在其上应用函数 func()。函数执行后,我们将得到一个由四个元素组成的数组,每个元素上都有应用到相应元素的函数的返回值。
运行代码后,结果如下:
The value 10 and the elaboration is 3.1622776601683795
This is the result: 3.1622776601683795
The value 3 and the elaboration is 1.7320508075688772
This is the result: 1.7320508075688772
The value 6 and the elaboration is 2.449489742783178
This is the result: 2.449489742783178
The value 1 and the elaboration is 1.0
This is the result: 1.0
这种可迭代对象映射机制可以在并行编程中进行扩展。如果每个元素上的函数计算可以并行进行,那么效率将大大提高。例如,可以将每个元素分配给一个进程,该进程的目标只是 map() 中传递的函数。在这种情况下,所有元素将同时被计算。对于一个包含 n 个元素的数组,普通程序需要 n * t 时间,其中 t 是函数的执行时间。而在并行情况下,由 n 个元素组成的同一个数组将被传递给 n 个进程,执行时间将被限制在 t(如果系统中有 n 个内核或 n 个处理器)。因此,效率的提高是显而易见的。
对于函数的并行映射,多进程模块提供了 multiprocessing.pool.Pool 类。该类为此提供了两个方法:
map()
map_async()
在并行编程的情况下,由于可能的同步方式不同,因此有两种方法。它们的工作方式几乎相同,只是当我们需要阻塞程序的执行,直到并行的所有目标函数都完成工作时,应使用 map() 函数。而 map_async() 函数应在不会阻塞整个程序执行的情况下使用。
map() 方法接收两个参数,即目标函数和可迭代对象:
results = pool.map(target, iterable):
顾名思义,池类将创建一个与可迭代元素数量相等的进程池,并将目标函数传递给这些进程。
为了更好地理解这些概念及其工作原理,我们有必要举一些实际例子:
import time
import math
import numpy as np
from multiprocessing.pool import Pool
def func(value):
result = math.sqrt(value)
print(“The value %s and the elaboration is %s” %(value, result) )
time.sleep(value)
return result
if name == ‘main‘:
with Pool() as pool:
data = np.array([10,3,6,1])
results = pool.map(func, data)
print(“The main process is going on…”)
for result in results:
print(“This is the result: %s” %result)
print(“END Program”)
运行这个程序,我们会得到类似下面的结果:
The value 10 and the elaboration is 3.1622776601683795
The value 3 and the elaboration is 1.7320508075688772
The value 6 and the elaboration is 2.449489742783178
The value 1 and the elaboration is 1.0
The main process is going on…
This is the result: 3.1622776601683795
This is the result: 1.7320508075688772
This is the result: 2.449489742783178
This is the result: 1.0
END Program
至于 map_async()函数,我们可以修改前面的代码,进行适当的替换,如下所示:
import time
import math
import numpy as np
from multiprocessing.pool import Pool
def func(value):
result = math.sqrt(value)
print(“The value %s and the elaboration is %s” %(value, result) )
time.sleep(value)
return result
if name == ‘main‘:
with Pool() as pool:
data = np.array([10,3,6,1])
results = pool.map_async(func, data)
print(“Main Process is going on…”)
for result in results.get():
print(“This is the result: %s” %result)
print(“END Program”)
运行该程序后,我们会得到与下面类似的结果:
Main Process is going on…
The value 10 and the elaboration is 3.1622776601683795
The value 3 and the elaboration is 1.7320508075688772
The value 6 and the elaboration is 2.449489742783178
The value 1 and the elaboration is 1.0
This is the result: 3.1622776601683795
This is the result: 1.7320508075688772
This is the result: 2.449489742783178
This is the result: 1.0
END Program
从两个不同的结果可以看出,我们得到了预期的行为。通过使用 map() 方法,我们获得了一种同步机制,它可以中断主进程的执行流程。直到 map() 启动的所有并行进程都执行完毕,主进程才会重新启动。事实上,只有在所有子进程都完成任务并将结果返回给结果后,才会出现 “主进程正在继续…… ”的文本。使用 map_async()时,主进程的执行将继续进行,而不会等待结果的完整评估。文本开头会显示 “主进程正在继续……”。
参考资料
软件测试精品书籍文档下载持续更新 https://github.com/china-testing/python-testing-examples 请点赞,谢谢!
本文涉及的python测试开发库 谢谢点赞! https://github.com/china-testing/python_cn_resouce
python精品书籍下载 https://github.com/china-testing/python_cn_resouce/blob/main/python_good_books.md
Linux精品书籍下载 https://www.cnblogs.com/testing-/p/17438558.html
3.4 使用 chunksize 并行映射
map() 函数将对可迭代表中的每个元素应用一个函数。如果可迭代元素的数量很多,那么调用尽可能多的进程来执行这些元素可能会非常低效。
更有效的方法是将可迭代元素划分为一定数量的部分,每个部分分配给一个进程。这可以通过向 map() 函数传递 chunksize 参数来实现。
让我们修改之前使用的 map() 方法示例代码,增加 np.array() 中元素的数量。然后,我们将为每个进程设置数组中四个元素的部分,并将等于 4 的 chunksize 参数传递给 map() 方法:
import time
import math
import numpy as np
from multiprocessing.pool import Pool
def func(value):
result = math.sqrt(value)
print(“The value %s and the elaboration is %s” %(value, result) )
time.sleep(value)
return result
if name == ‘main‘:
with Pool() as pool:
data = np.array([10,3,6,1,4,5,2,9,7,3,4,6])
results = pool.map(func, data, chunksize=4)
print(“The main process is going on…”)
for result in results:
print(“This is the result: %s” %result)
print(“END Program”)
运行代码后,结果如下:
The value 10 and the elaboration is 3.1622776601683795
The value 7 and the elaboration is 2.6457513110645907
The value 4 and the elaboration is 2.0
The value 5 and the elaboration is 2.23606797749979
The value 3 and the elaboration is 1.7320508075688772
The value 2 and the elaboration is 1.4142135623730951
The value 3 and the elaboration is 1.7320508075688772
The value 4 and the elaboration is 2.0
The value 9 and the elaboration is 3.0
The value 6 and the elaboration is 2.449489742783178
The value 6 and the elaboration is 2.449489742783178
The value 1 and the elaboration is 1.0
The main process is going on…
This is the result: 3.1622776601683795
This is the result: 1.7320508075688772
This is the result: 2.449489742783178
This is the result: 1.0
This is the result: 2.0
This is the result: 2.23606797749979
This is the result: 1.4142135623730951
This is the result: 3.0
This is the result: 2.6457513110645907
This is the result: 1.7320508075688772
This is the result: 2.0
This is the result: 2.449489742783178
END Program
除了 map() 函数,Pool 类还提供了一系列具有类似行为的方法,如 imap() 和 apply(),所有这些方法也都是异步版本。
3.5 ProcessPoolExecutor
我们还有一个与进程池机制类似的类,即由 concurrent.future 模块分发的 ProcessPoolExecutor。该类也提供了一个可重复使用的进程池,用于执行特定操作,包括 map()。
此外,在这种情况下,您还可以指定要执行的目标函数的名称,以及将在其上应用该函数的可迭代次数。一旦所有分配的任务都已完成,ProcessPoolExecutor 必须关闭,以便释放所有使用的资源。关闭时会使用 shutdown() 函数。
但即使在这种情况下,也有更简单、更高效的结构。事实上,ProcessPoolExecutor 与上下文管理器和 with 声明是兼容的:
with ProcessPoolExecutor() as executor:
在这种情况下,当执行退出上下文管理器块时,会自动调用 shutdown() 函数。上下文管理器相当于:
try:
executor = ProcessPoolExecutor()
finally:
executor.shutdown()
由此可见,在这些情况下使用上下文管理器非常方便。ProcessPoolExecutor 作为一个池对于并行运行 map() 函数非常有用。例如,我们可以在下面的代码中看到这一点:
import time
import math
import numpy as np
from concurrent.futures import ProcessPoolExecutor
def func(value):
result = math.sqrt(value)
print(“The value %s and the elaboration is %s” %(value, result) )
time.sleep(value)
return result
if name == ‘main‘:
with ProcessPoolExecutor(10) as executor:
data = np.array([10,3,6,1])
for result in executor.map(func, data):
print(“This is the result: %s” %result)
print(“END Program”)
执行:
The value 10 and the elaboration is 3.1622776601683795
The value 3 and the elaboration is 1.7320508075688772
The value 6 and the elaboration is 2.449489742783178
The value 1 and the elaboration is 1.0
This is the result: 3.1622776601683795
This is the result: 1.7320508075688772
This is the result: 2.449489742783178
This is the result: 1.0
END Program
在此之前,我们已经看过 Pool() 的 map() 方法,该方法可以使用 chunksize 参数将多个元素的部分分配给每个进程。在 ProcessPoolExecutor 中,map() 方法也使用了相同的参数。
此外,为了识别是哪个进程处理了 np.array 中的不同元素,我们导入了 os 模块,该模块使用 getpid() 来获取不同进程的 ID:
import time
import math
import os
import numpy as np
from concurrent.futures import ProcessPoolExecutor
def func(value):
result = math.sqrt(value)
pid = os.getpid()
print(“[Pid:%s] The value %s and the elaboration is %s” %(pid, value, result) )
time.sleep(value)
return result
if name == ‘main‘:
with ProcessPoolExecutor(10) as executor:
data = np.array([10,3,6,1,4,5,2,9,7,3,4,6])
for result in executor.map(func, data, chunksize=4):
print(“This is the result: %s” %result)
print(“END Program”)
运行代码将产生类似下面的结果:
[Pid:70608] The value 10 and the elaboration is 3.1622776601683795
[Pid:94972] The value 4 and the elaboration is 2.0
[Pid:111764] The value 7 and the elaboration is 2.6457513110645907
[Pid:94972] The value 5 and the elaboration is 2.23606797749979
[Pid:111764] The value 3 and the elaboration is 1.7320508075688772
[Pid:94972] The value 2 and the elaboration is 1.4142135623730951
[Pid:70608] The value 3 and the elaboration is 1.7320508075688772
[Pid:111764] The value 4 and the elaboration is 2.0
[Pid:94972] The value 9 and the elaboration is 3.0
[Pid:70608] The value 6 and the elaboration is 2.449489742783178
[Pid:111764] The value 6 and the elaboration is 2.449489742783178
[Pid:70608] The value 1 and the elaboration is 1.0
This is the result: 3.1622776601683795
This is the result: 1.7320508075688772
This is the result: 2.449489742783178
This is the result: 1.0
This is the result: 2.0
This is the result: 2.23606797749979
This is the result: 1.4142135623730951
This is the result: 3.0
This is the result: 2.6457513110645907
This is the result: 1.7320508075688772
This is the result: 2.0
This is the result: 2.449489742783178
END Program
钉
声明:文中观点不代表本站立场。本文传送门:https://eyangzhen.com/423789.html