5 利用分布式系统实现并行性
分布式系统属于并行和并发编程的范畴,由于其性能水平,可以被证明是一种有效的解决方案。因此,必须熟悉这些类型的系统,以及 Python 在这方面提供的相关解决方案。在本章中,我们将介绍几种可能的解决方案。我们将首先讨论 Celery,它是目前 Python 中实现分布式系统的参考点。该库是构建此类系统的有效工具,也是了解可能的体系结构、组成体系结构的元素以及它们背后的各种机制的绝佳范例。通过一系列示例,我们将了解如何使用该库执行多个并发操作(称为任务),以及这些操作如何在基于 Celery 的系统上并行分布和执行。此外,我们随后还将介绍其他替代解决方案,从非常相似的 Dramatiq 到更简单但概念上与之不同的 SCOOP。
主要内容:
分布式系统
客户端-服务器模型
Celery 框架
消息传递
任务
工作站
Dramatiq
Docker
SCOOP
5.1 分布式系统和编程
分布式系统是指组成系统的应用元素在不同机器上运行,并为同一目的协同工作的任何系统。这些元素位于不同的机器上,甚至可能在地理位置上相距甚远,但它们可以通过连接网络交换信息进行通信。
同时独立执行任务的实体的存在,是我们在前几章中看到的线程和进程的进一步延伸。因此,分布式系统除了可以在客户机-服务器模型的关键模式下执行有用的操作外,还完全可以并发甚至并行地执行一系列操作。由此可见,分布式系统完全可以纳入并行编程的范畴,而且所涉及的大多数概念也完全适用于这些系统。
因此,我们可以将并行编程扩展到客户机-服务器模型中,即客户机向服务器提出一系列任务请求,服务器负责在分布式系统中搜索能够并行执行这些任务的一系列计算实体(称为 Worker):
服务器将收集异步生成的结果,并将其发送给提出请求的客户端。这种编程方式可定义为分布式编程,也是本章的主题。
5.1.1 分布式系统的优缺点
由于以下因素,这些系统可以实现出色的性能:可扩展性、并发性、灵活性、鲁棒性(容错)
可扩展性
随着要解决的问题对资源需求的增加,系统中存在的元素数量也有可能增加,这就保证了系统的可扩展性。从理论上讲,在分布式系统中,这种增加是没有限制的。如果问题增多,只需在系统元素的其他副本中添加其他机器,就能接管竞争计算。很明显,分布式系统注重的是横向可扩展性,而不是纵向可扩展性。
并发性
这是这些系统架构中隐含的另一个特点。事实上,在执行一系列任务时,分布式系统能够将每项任务分配给一个独立的处理单元,该处理单元将与其他单元同时运行。因此,所有任务都将在竞争中执行,而且往往是并行执行。
灵活性
分布式系统的灵活性在于它是由众多元素组成的,每个元素都扮演一定的角色,分布在与系统相连的某些机器上。这些机器在本质上可以是异构的,系统的各个组成部分可以被视为真正独立的应用程序,可以用其他发挥相同作用的兼容应用程序来替代。因此,在构建分布式系统时,可以选择由哪个应用程序来扮演某个组件的角色。我们可以根据不同的操作系统、应用程序本身的功能或我们希望在系统中加入的特定特性来做出选择。所有这些都能让我们根据需要创建一个始终不同的分布式系统。
系统的鲁棒性或容错性
系统中的冗余元件也保证了这一点。在不同的机器上都有元件的副本,这就保证了即使应用程序或整台机器发生故障,也能继续开展工作。这类系统能够以透明、快速的方式管理任务从故障应用程序到备用应用程序的转移。从客户的角度来看,即使发生故障,所要求的服务也不会中断。
并不是所有的东西都是金子,尽管分布式系统有很多优点,但必须考虑到它也需要付出一定的代价。
事实上,分布式系统基于大量元素,这些元素可能分布在不同的机器上,这就需要在不同元素之间建立一种通信机制:消息传递。分布式系统的各个组件利用消息相互通信和交换信息。因此,每次都需要创建一条信息,并通过网络将其发送给合适的接收者,而接收者则必须接收信息并解压缩,以搜索其中的信息。显然,这样一个系统还需要一个管理系统,而所有这些都需要消耗大量资源。
另一个问题是,分布式系统由许多不同的元素组成,这也给系统维护带来了一定的困难。事实上,有许多应用程序都可以执行这些任务,而且所有这些应用程序都应在所有系统上得到支持,并执行所有可能的功能。这的确是不可能的。因此,在构建分布式系统时,可能会出现这样的情况:我们的选择并不完美,某些元素并不能完美地支持所有所需的功能。不兼容性可能会出现,尤其是在某些元素的版本更新比其他元素快的情况下。
此外,随着系统元素的增加,操作的复杂性也会增加。因此,一旦出现问题或管理问题,检查其完美运行就会变得越来越困难。
最后需要考虑的是分布式系统的网络基础设施对与其连接的网络的依赖性。这种基础设施对提高分布式系统的效率至关重要。网络故障、配置不良或其他问题往往表明网络管理不完善。这项任务通常不属于开发人员的权限范围。
5.2 Celery
Celery 是一个用 Python 开发的框架,其目的是基于 Python 语言创建分布式系统。
因此,通过一系列组件和工具,Celery 可以让您以一种简单的方式,在每个人都能触及的范围内,构建一个适合不同需求的分布式系统。它的特殊性使我们能够构建复杂度不断增加的系统,从封闭在单台机器中的分布式系统,到由无数台同样基于不同技术的机器组成的系统网络。这种可能性在很大程度上要归功于 Python 强大的可移植性,它可以在不同的体系结构和操作系统上运行。
具体来说,Celery 分布式系统的特点可以用三个关键点来表示:可移植性、高可用性、扩展性。
在该系统中,即使其架构主要由 Celery 控制,但组成该系统的各种组件可以属于其他技术,并且可以在没有重大问题的情况下添加、替换和修改(可插拔组件)。这样,系统就可以根据不同的需求和使用的技术进行移植。这种方法也非常适合分布式系统,因为每个组件一般都可以安装在不同的机器上,物理上是分开的,甚至在地理位置上是相距甚远的,但都可以通过网络连接到系统上。
这种架构的另一个优点是,为客户提供服务的组件(如经纪人和工作者)可以有不同的数量,并分布在不同的机器上。这样,即使某些机器可能遇到硬件或软件问题,也能保证持续运行,从而使系统具有高可用性的特点。此外,此类组件的数量可根据要求进行调整,从理论上讲,这一数量没有上限。很容易通过网络连接向系统添加额外的机器,从而共享资源消耗,保持每个组件的资源消耗不变。因此,随着问题的增加,可以添加更多的组件,有效地分配资源,确保横向扩展。
5.2.1 Celery 系统的架构
在基于 Celery 的分布式系统中,有许多不同的组件在发挥作用,每个组件在分布式计算机制中都扮演着特定的角色。主要组件包括:代理、任务队列、客户端、工作进程、结果后台(可选)。
这些组件通过消息机制相互交换值和信息。要实现这一点,必须有消息传输应用程序的支持,在 Celery 系统中,消息传输应用程序被定义为 Broker。有几种与 Celery 系统兼容的消息传输应用程序,它们各自使用不同的通信协议:
客户端(CLIENTS)是指在执行任务时需要分布式系统提供的某些服务的进程。这些请求是通过消息提出的,其中包含要执行的任务的说明。这些信息被发送到代理,然后由代理进行管理。任务处理结束后,客户端将在结果后台(通常是数据库)找到请求的结果。这种架构的示意图如图 5.2 所示。
工作进程(WORKERS)是存在于同一台机器上或分布在网络上并与 Celery 系统相连的其他机器上的进程。这些进程被专门设计用于执行由代理分派给它们的任务。一旦启动,工作进程就会等待,一旦收到分配给它们的任务,它们就会执行任务,最后将结果发送到结果后台。
一般来说,这些工作者可以执行各种任务。这只会提高系统的效率。如果工人没有收到特定任务,它可以执行任务队列中的其他类型任务,而不是等待。
在组成 Celery 分布式架构的各个组件中,代理和其中的任务队列起着核心作用。代理执行两项基本任务:
第一项任务是通过在任务队列中输入相关任务来接收不同客户发送的消息。
另一项任务则是将队列中的各种任务分派给相应的等待工作者。
任务队列会累积不同客户请求的任务。这样,如果客户提出某个服务请求时,工人不能立即提供服务,该任务就会留在任务队列中等待处理。代理将负责把各种任务分配给各个等待的工作者。这样,就能以异步和并发的方式管理任务的执行。
结果后台的任务是收集不同工作者在执行任务期间产生的结果。这些结果将通过读取的方式提供给客户端。此外,在这一阶段,返回值会进入一种异步机制,这样就可以积累结果,而无需在返回后立即管理单个结果。这些机制在分布式系统中是必要的,因为在分布式系统中,各个组件彼此独立、异步行动,在不可预测的时间内进行请求、处理和管理结果的活动,在这种情况下,等待时间是必要的。
5.2.2 任务(Tasks)
正如我们在前面的章节中所看到的,Celery 系统(以及一般的分布式系统)是基于在不同组件之间有效执行任务的分配。在这种情况下,任务的概念尤其重要。任何需要在系统中分配执行的任务都必须封装在一个任务中。任务也可定义为可在此类系统中分配的工作单位。最后一个定义对我们非常有用,因为它可以追溯到并发和并行编程,在并发和并行编程中,任何任务一般都可以划分为小的原子部分,即可以单独和独立执行的部分,也被定义为任务。
这就是分布式系统和并发编程这两种技术的结合点。一旦我们将问题分解成任务,每个任务都可以独立执行;在分布式系统中,这些任务可以发送到代理,由分布在网络上的不同工作者并发执行。随后产生的结果将被收集到结果后台,并由客户端程序重新组合,以获得最终结果。在处理线程和异步共同程序时,我、事件和小事件。所们已经遇到过类似的行为。但在这种情况下,我们有了 Worker 这个概念实体,在这个实体下可以识别出许多不同的现实,如进程、线程有这些都以透明方式或通过适当设置进行管理。
5.2.3 设置 Celery 系统
在简洁而完整地定义了基于 Celery 的系统组件的理论部分后,现在让我们进入实际应用,安装和设置所有组件。
5.2.4 安装 Anaconda
一个实用技巧:当你想构建测试系统时,就像本书中的例子一样,创建一个虚拟环境来安装不同的 Python 包和模块是一个很好的做法。当出现错误或不再需要时,可以顺利地将其删除,而不会在应用系统中安装的 Python 基本版本与测试环境安装过程中的所有依赖、删除和手动安装之间产生冲突。
我们可以使用 Anaconda 开发平台,它在管理虚拟环境和软件包安装方面有很大帮助。对于那些还不熟悉该平台的人来说,可以从网上(https://www.anaconda.com/products/distribution)免费下载。
安装完成后,进入 Environments(环境)面板创建虚拟环境:
然后点击底部中央的创建按钮:
您可以在其中输入要使用的虚拟环境的名称并选择其所基于的 Python 版本:
设置好 Python 名称和版本后,只需按下创建按钮即可创建虚拟环境,然后在 Anaconda 导航面板上找到它。此时,只需点击将出现在中央列表中的名称,即可激活虚拟环境。在我的例子中,我选择将虚拟环境称为 Parallel。虚拟环境激活后,侧面会出现一个绿色图标(PLAY)。在面板的右侧,将显示虚拟环境中安装的所有 Python 软件包的列表。带有 Anaconda 图标(绿色圆圈)的软件包是通过发行版本身安装的,而带有 Python 图标的软件包则是通过 pip 命令从外部安装的:
Anaconda 是一个非常有用、非常灵活的平台。它提供了一整套开发应用程序,允许你以多种方式工作。进入 “主面板”,我们刚刚创建的虚拟环境中所有可用的应用程序就会出现:
正如我们所见,这里有许多可用的应用程序,包括非常有用的 Jupyter Notebook 和 CMD.exe Prompt。点击后者的 “启动”(Launch)按钮将打开一个完全为虚拟环境设置的命令控制台:
5.2.5 安装 Celery
在新打开的命令控制台中,我们将使用 PIP 安装 Celery 库。
pip install celery
注意:Celery 软件包存在于 Anaconda 发行版中,但并不完整(至少在我的版本中是这样)。然后,我直接从 PIP 下载 Celery,PIP 还提供了所有其他相关软件包,这些软件包将为我提供本书示例中所需的应用程序。
5.2.6 安装 Docker
当我想测试分布式环境(如本章将介绍的环境)时,我更喜欢使用的另一个应用程序是 Docker Desktop。该应用程序允许你以名为容器的软件包形式隔离提供服务器、数据库等服务的应用程序。它在开发环境中非常有用,因为这些封装在容器中的应用程序可以通过非常简单的方式激活、停用、删除和克隆。如果您想测试一个有许多活动监听服务(如 Celery)的分布式环境,那么它就是您的最佳选择。鉴于 Python 版本和软件包依赖关系的复杂性,Docker 可以让您快速更改和测试各种服务器应用程序的版本,而无需进行复杂的安装和配置。此外,同样以 Celery 为例,有不同的应用程序扮演着相同的角色,可以直接在 Celery 分布式系统中进行实时评估。通过将它们一起安装为容器,然后一个接一个地激活和测试它们,而系统的其他部分保持不变。
要安装 Docker Desktop,只需访问官方应用程序页面 (https://www.docker.com/)。
一旦安装了 Docker Desktop,就可以通过命令行安装一系列现成的容器,其中包含各种分布式版本的应用程序:
docker run -d appname:version
5.2.7 安装消息传输器(Broker)
安装 Celery 后,最关键的组件是安装 Broker。有几款消息传输应用程序与 Celery 兼容,但并非所有这些应用程序都完全支持 Celery 经纪人的所有机制。就功能而言,最完整、最兼容的消息传输应用程序是RabbitMQ、Redis。
如果您使用的是 Linux,要在服务器机器上安装 Redis 应用程序,请执行以下操作
$ sudo apt-get install redis-server
然后在命令行中输入以下命令启动服务器
$ redis-server
就我而言,由于使用的是单机(客户端-服务器),我更喜欢使用 Docker 应用程序来安装和启动所有服务以及 Redis:
$ docker run -d -p 6379:6379 redis
然后安装特定的 Celery 模块,以便在其中使用 Redis:
pip install celery[redis]
另一方面,如果我们更喜欢使用 RabbitMQ 应用程序,也可以安装它:
$ sudo apt-get install rabbitmq-server
在 Docker 上安装 RabbitMQ:
docker run -d -p 5672:5672 rabbitmq:3.9
在 Docker Desktop 结束时,我们将拥有活动服务: RabbitMQ 和 Redis:
此外,在这种情况下,有必要安装专门为在其中使用 RabbitMQ 而创建的 Celery 模块:
pip install celery[librabbitmq]
5.2.8 安装结果后端
对于结果后端这样的组件,也有许多不同的应用。在 Celery 支持的结果后端中,我们可以找到RabbitMQ、Redis、MongoDB、
Memcached,为了简单起见,我们也将使用 RabbitMQ 和 Redis 作为结果后端。因此,在我们的书中没有必要做进一步的安装。
5.2.9 设置 Celery 系统
Celery 的运行不需要特殊配置,至少在最一般的情况下是这样。只有少数设置是以编程方式进行的。事实上,你可以在 Celery 实例化时定义将成为 Celery 系统一部分的组件:
要创建 Celery 的实例(按照惯例,我们将其称为 app,但也可以以任何方式调用),我们需要使用 Celery() 构造函数。它接受的参数包括:包含系统将提供给客户端的任务的模块(tasks.py),以及属于系统的组件的各种 URL 和协议。
例如,如果我们要将 RabbitMQ 服务器应用程序用作 Broker,我们将写道
from celery import Celery
app = Celery(‘tasks’, broker=’pyamqp://guest@localhost//’, backend=’rpc://’)
如您所见,RabbitMQ 服务器基于 pyamqp 协议工作,并且可以在本地主机地址(即与我们在同一台机器上)作为访客用户使用。
如果我们想将 Redis 应用程序用作 Broker,我们将写道
from celery import Celery
app = Celery(‘tasks’, broker=’redis://localhost//’, backend=’redis://localhost’)
这些代码片段必须插入 tasks.py 文件中。
配置完成后,我们就可以启动 Celery 服务了:
$ celery -A tasks worker –loglevel=INFO
运行该命令后,我们将获得类似下面的输出:
————– celery@LAPTOP-1STIRGLU v5.2.7 (dawn-chorus)
— * —–
— * —- Windows-10-10.0.22000-SP0 2022-09-22 14:40:30
- *** — * —
- ** ———- [config]
- ** ———- .> app: tasks:0x2c647cdad40
- ** ———- .> transport: amqp://guest:**@localhost:5672//
- ** ———- .> results: rpc://
- *** — * — .> concurrency: 12 (prefork)
— * —- .> task events: OFF (enable -E to monitor tasks in this worker)
— * —–
————– [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2022-09-22 14:40:30,583: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-09-22 14:40:30,604: INFO/MainProcess] mingle: searching for neighbors
[2022-09-22 14:40:31,535: INFO/SpawnPoolWorker-1] child process 28788 calling self.run()
[2022-09-22 14:40:31,535: INFO/SpawnPoolWorker-2] child process 27892 calling self.run()
[2022-09-22 14:40:31,550: INFO/SpawnPoolWorker-3] child process 19868 calling self.run()
[2022-09-22 14:40:31,566: INFO/SpawnPoolWorker-4] child process 19072 calling self.run()
[2022-09-22 14:40:31,566: INFO/SpawnPoolWorker-8] child process 31208 calling self.run()
[2022-09-22 14:40:31,566: INFO/SpawnPoolWorker-7] child process 30672 calling self.run()
[2022-09-22 14:40:31,582: INFO/SpawnPoolWorker-5] child process 30480 calling self.run()
[2022-09-22 14:40:31,582: INFO/SpawnPoolWorker-6] child process 6328 calling self.run()
[2022-09-22 14:40:31,582: INFO/SpawnPoolWorker-9] child process 14124 calling self.run()
[2022-09-22 14:40:31,598: INFO/SpawnPoolWorker-11] child process 28616 calling self.run()
[2022-09-22 14:40:31,598: INFO/SpawnPoolWorker-10] child process 26540 calling self.run()
[2022-09-22 14:40:31,613: INFO/SpawnPoolWorker-12] child process 4888 calling self.run()
[2022-09-22 14:40:31,726: INFO/MainProcess] mingle: all alone
[2022-09-22 14:40:31,779: INFO/MainProcess] celery@LAPTOP-1STIRGLU ready.
如你所见,服务器默认情况下会启动生成 12 个工作进程。这个数量取决于运行 Celery 的机器的特性。在我的例子中,有 12 个内核,因此创建了 12 个 Worker。Broker 将保持激活状态,等待调用任务分配给 Worker。
如果在 Windows 上运行,则必须编写
$ celery -A tasks worker –pool=solo –loglevel=INFO
但在这种情况下,您将使用单线程工作,不会再出现竞争现象。在这种情况下,我们需要使用 eventlet 设置 Worker。让我们安装它们:
pip install eventlet
celery -A tasks worker –pool=eventlet –loglevel=INFO
使用 Celery 结束后,要关闭程序,请按 CTRL + C:再次点击 Ctrl+C 将终止所有正在运行的任务!
所有进程结束后,提示符将返回给您。
有关命令行选项的完整列表
$ celery worker -help
但还有许多其他命令可用,要了解它们,可以在命令行中写入:
$ celery -help
5.2.10 定义任务
现在我们已经有了一个简单但足够的基于 Celery 的分布式系统,可以开始定义任务和测试示例了。
在 Celery 的服务器端,在 tasks.py 模块文件中,您可以实现系统将提供给客户端的任务。要在代码中识别任务,需要使用 @app.task 装饰器,其中 app 是在 tasks.py 文件中使用的 Celery 实例:
@app.task
def my_task():
return “This is a task”
5.2.11 调用任务
任务是分布式系统提供的服务,客户可以请求这些服务。因此,在客户代码中,我们可以随时请求服务,也就是调用任务。为此,Celery 为我们提供了两种不同的方法:
delay()
apply_async()
这两种方法都是通过消息将任务发送给代理,它们之间的区别在于可传递参数的类型。delay() 方法使用单独放置的参数:
task.delay(arg1, arg2, kwarg1=’x’, kwarg2=’y’)
而 apply_async() 会将参数封装在一个列表和一个字典中:
task.apply_async(args=[arg1, arg2], kwargs={‘kwarg1’: ‘x’, ‘kwarg2’: ‘y’})
当我们调用一个任务时,会有一个 AsyncResult 值作为返回值。通过该对象,我们可以从客户端检查任务的状态。
5.2.12 任务示例
在服务器 tasks.py 模块文件中,我们定义了如下任务:
@app.task
def add(x, y):
return x + y
在客户端,我们将在 tasks.py 所在目录的同级打开一个 Python 会话,这大大简化了操作。打开会话后,我们首先导入在 tasks.py 中创建的任务,然后调用它,只需使用 delay() 方法即可:
from tasks import add
add.delay(4,4)
执行客户端代码时,任务将封装在消息中并发送给监听代理。经 Worker 处理后,您可以在 Worker 所在机器的控制台上看到输出结果:
[2022-09-22 15:30:46,024: INFO/MainProcess] Task tasks.add[4966d730-5334-4089-8395-cb04f40de3c9] received
[2022-09-22 15:30:46,041: INFO/MainProcess] Task tasks.add[4966d730-5334-4089-8395-cb04f40de3c9] succeeded in 0.01600000000325963s: 8
我们可以看到,输出中显示了两行。第一行显示代理已收到消息,第二行显示任务已成功执行,并报告了执行时间和返回结果。
如果我们想使用 apply_async(),就必须在列表中传递任务所需的两个参数:
from tasks import add
add.delay(4,4)
如果我们设置了使用结果后台的 Celery,我们也可以通过以下方式在客户端获取该值:ares = add.delay(4,4)
ares.get()
8
调用任务会返回一个 AsyncResult 对象。通过 get() 方法,我们将要求该对象从结果后台读取结果。该值只有在任务处理完毕后才能获得,因此需要一些时间。由于 get() 调用必须是异步的,因此有必要先检查被调用任务的状态。例如,可以通过 ready() 方法了解任务是否已完成:ares.ready()
True
这将返回一个布尔值: 如果任务已完成,则返回 True,否则返回 False。另一个基本信息是使用 successful() 和 failed() 方法分别通过引发异常来了解任务是成功还是失败。这些方法也返回布尔值。所有这些方法和其他类似方法都有助于根据任务的状态管理程序的执行控制:ares.successful()
True
ares.failed()
False
5.2.13 签名和原语
我们已经了解了如何在客户端程序中调用任务。但 Celery 提供的可能性还不止于此。事实上,Celery 库还提供了一系列工具,让您能以非常简单的代码方式,同时在多个任务上构建复杂的工作流。
首先,Celery 使用了签名(signatures)这一概念,可以简化任务调用,例如
s1 = add.signature((2,2), countdown=10)
在这种情况下,我们将任务调用存储在签名中,这样就可以在代码中使用它,而无需重写所有内容,只需使用刚刚定义的签名即可:s1.delay()
还有一个默认签名 s(),它可以简单地替换对 delay() 的调用:add.delay(4,5)
使用此签名即可:add.s(4,5)
这样,程序员的工作就轻松多了。事实上,在 Celery 库提供的一系列函数中,这些签名作为参数使用是必不可少的:group()
、chain()和chord()。
这些函数基于原语,而原语不过是具有复杂调用结构的签名,正是因为它们的形状,才能在分布式系统中产生特殊的工作流程。
例如,group() 方法将同时执行的任务列表作为参数。返回值是一个特定值,允许将其作为一个组进行检查,最后得到一个包含按照被调用任务顺序排列的元素的列表:
from celery import group
g = group(add.s(i,i) for i in range(10))()
g.get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
从服务器控制台,我们将看到以下输出:
[2022-09-22 16:24:24,645: INFO/MainProcess] Task tasks.add[4dfadc20-7deb-459c-8dc5-9db5ea377263] succeeded in 0.031000000017229468s: 2
[2022-09-22 16:24:24,670: INFO/MainProcess] Task tasks.add[fb8fa5fc-7c9f-4659-8168-26133100f17b] succeeded in 0.01600000000325963s: 6
[2022-09-22 16:24:24,695: INFO/MainProcess] Task tasks.add[1b9b73e9-a8f4-43d5-aacc-791a19789a1c] succeeded in 0.030999999959021807s: 4
[2022-09-22 16:24:24,721: INFO/MainProcess] Task tasks.add[f2764fb6-ed38-4f8c-927a-0b4c4810a03f] succeeded in 0.031000000017229468s: 14
[2022-09-22 16:24:24,746: INFO/MainProcess] Task tasks.add[9e6d6b83-1b3f-4b9f-963f-2a94bea21b93] succeeded in 0.01600000000325963s: 18
[2022-09-22 16:24:24,771: INFO/MainProcess] Task tasks.add[c4b3b795-329b-4f88-8409-4f425ad986a7] succeeded in 0.031000000017229468s: 8
[2022-09-22 16:24:24,796: INFO/MainProcess] Task tasks.add[72bc2b1c-189b-4d56-921d-decbc17cc29f] succeeded in 0.01600000000325963s: 16
[2022-09-22 16:24:24,825: INFO/MainProcess] Task tasks.add[30d86bf8-2f46-4e50-b2df-1a9b051c283a] succeeded in 0.01600000000325963s: 12
[2022-09-22 16:24:24,850: INFO/MainProcess] Task tasks.add[bdbc022a-2e01-4cf9-870a-3ae0cf20ea9e] succeeded in 0.031000000017229468s: 10
我们可以看到,发送的 10 个任务是在不同时间执行的,而不是按照调用中使用的迭代顺序执行。我们可以看到,不同任务的执行时间也不尽相同。不过,各个操作的结果都是按照这个顺序收集和排序的。
另一个非常有用的方法是 chain(),它允许将多个任务以链的形式连接起来,以便将一个任务的结果作为第二个任务的参数,以此类推:
from tasks import add, mul
from celery import chain
c = chain(add.s(4,4) | mul.s(8))()
c.get()
而从服务器控制台,我们将得到以下输出:
[2022-09-22 16:41:52,501: INFO/MainProcess] Task tasks.add[9bad176a-937d-4742-8698-469b0918fe69] received
[2022-09-22 16:41:52,539: INFO/MainProcess] Task tasks.add[9bad176a-937d-4742-8698-469b0918fe69] succeeded in 0.031000000017229468s: 8
[2022-09-22 16:41:52,540: INFO/MainProcess] Task tasks.mul[21339f93-56f3-45c1-a09a-09b24b36999a] received
[2022-09-22 16:41:52,565: INFO/MainProcess] Task tasks.mul[21339f93-56f3-45c1-a09a-09b24b36999a] succeeded in 0.030999999959021807s: 64
从这里我们可以看到发生了我们所描述的情况。从链中第一个任务接收到信息后立即执行。然后将其结果作为参数传递给第二个任务,并作为信息发送回代理。然后再执行第二个任务,并将结果返回给客户端。
另一种方法是 chord(),它在回调中使用一组任务:
from tasks import add, mul, xsum
from celery import chord
c = chord((add.s(i,i) for i in range(10)), xsum.s())()
c.get()
90
注意:如果尝试用 RPC 获取chord,会收到一条错误信息,因为这种类型的结果后台没有实现这一功能:NotImplementedError: The “rpc” result backend does not support chords!
请注意,与任务连锁的组也会升级为chord,因为这种模式需要同步。
支持chord 的结果后端: Redis、数据库、Memcached 等。
在这种情况下,你必须使用 Redis 作为结果后端。替换设置并重启 Celery 服务器。
在 Celery 服务器控制台中,你将获得以下输出:
[2022-09-22 16:59:09,209: INFO/MainProcess] Task tasks.add[c4a4c71c-3c4b-45ce-aef2-842cfe024330] received
[2022-09-22 16:59:09,242: INFO/MainProcess] Task tasks.add[a03d2f04-347b-47da-b052-087ef34f478f] received
[2022-09-22 16:59:09,279: INFO/MainProcess] Task tasks.add[e3af658d-d41f-4714-87e1-1af6a63c2401] received
[2022-09-22 16:59:09,309: INFO/MainProcess] Task tasks.add[920cbf97-7a20-4c57-8024-22f69fbdccea] received
[2022-09-22 16:59:09,335: INFO/MainProcess] Task tasks.add[c4a4c71c-3c4b-45ce-aef2-842cfe024330] succeeded in 0.125s: 0
[2022-09-22 16:59:09,344: INFO/MainProcess] Task tasks.add[a03d2f04-347b-47da-b052-087ef34f478f] succeeded in 0.10899999999674037s: 2
[2022-09-22 16:59:09,349: INFO/MainProcess] Task tasks.add[2bc571ef-794a-4581-8cbe-3a8cf6dea136] received
[2022-09-22 16:59:09,379: INFO/MainProcess] Task tasks.add[e3af658d-d41f-4714-87e1-1af6a63c2401] succeeded in 0.09399999998277053s: 4
[2022-09-22 16:59:09,382: INFO/MainProcess] Task tasks.add[2e5bc833-b47e-40a7-85e9-74fc152eeb16] received
[2022-09-22 16:59:09,398: INFO/MainProcess] Task tasks.add[920cbf97-7a20-4c57-8024-22f69fbdccea] succeeded in 0.0940000000409782s: 6
[2022-09-22 16:59:09,419: INFO/MainProcess] Task tasks.add[004d45e8-1b93-4377-9695-fe000432be75] received
[2022-09-22 16:59:09,448: INFO/MainProcess] Task tasks.add[2bc571ef-794a-4581-8cbe-3a8cf6dea136] succeeded in 0.09399999998277053s: 8
[2022-09-22 16:59:09,451: INFO/MainProcess] Task tasks.add[ff29e563-cc91-44f9-a3cb-b2a1895444e3] received
[2022-09-22 16:59:09,484: INFO/MainProcess] Task tasks.add[2e5bc833-b47e-40a7-85e9-74fc152eeb16] succeeded in 0.10899999999674037s: 10
[2022-09-22 16:59:09,488: INFO/MainProcess] Task tasks.add[760cf014-8613-44f5-bbc3-77abcf6a7901] received
[2022-09-22 16:59:09,507: INFO/MainProcess] Task tasks.add[004d45e8-1b93-4377-9695-fe000432be75] succeeded in 0.09399999998277053s: 12
[2022-09-22 16:59:09,521: INFO/MainProcess] Task tasks.add[2938bca7-6566-4c47-8d0d-c8e94045ea52] received
[2022-09-22 16:59:09,538: INFO/MainProcess] Task tasks.add[ff29e563-cc91-44f9-a3cb-b2a1895444e3] succeeded in 0.07800000003771856s: 14
[2022-09-22 16:59:09,560: INFO/MainProcess] Task tasks.add[760cf014-8613-44f5-bbc3-77abcf6a7901] succeeded in 0.061999999976251274s: 16
[2022-09-22 16:59:09,575: INFO/MainProcess] Task tasks.xsum[3329ba27-eb79-47b2-9618-7e71a8cad91e] received
[2022-09-22 16:59:09,616: INFO/MainProcess] Task tasks.add[2938bca7-6566-4c47-8d0d-c8e94045ea52] succeeded in 0.09399999998277053s: 18
[2022-09-22 16:59:09,622: INFO/MainProcess] Task tasks.xsum[3329ba27-eb79-47b2-9618-7e71a8cad91e] succeeded in 0.0470000000204891s: 90
通过 chunks() 方法,我们可以将可迭代数分割成更小的块。这样,如果我们有 100 个对象,就可以把它们分成 10 个任务,每个任务有 10 个对象:
from tasks import add
from celery import chunks
z = zip(range(10),range(10))
list(z)
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
ch = add.chunks(z,10)()
ch.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]]
5.3 作为 Celery 替代库的 Dramatiq 库
虽然 Celery 目前是那些想用 Python 建立分布式系统的人的参考库,但也有其他有效的替代库,它们的发展势头正逐渐强劲。与 Celery 基本原理差别最小的可能就是 Dramatiq 库。Dramatiq 库与 Celery 有许多相似之处,因此对于已经熟悉 Celery 的用户来说,Dramatiq 库是一个合适的候选库,因为它们都有许多共同的特点。两者都基于异步任务,并依赖于 Redis 或 RabbitMQ 等应用程序。
Celery 的缺点之一是相当复杂,而 Dramatiq 的设计要简单直观得多。
5.3.1 安装 Dramatiq
在开始举例说明之前,我们先在虚拟环境或专用虚拟环境中安装最新版的 Dramatiq(当前版本为 1.13)。打开命令控制台,插入
pip install dramatiq
在我们的示例中,我们将在 Dramatiq 中使用 Redis 应用程序;因此,我们需要安装必要的依赖项,以便使用它。为此,我们还将编写以下命令:
pip install dramatiq[redis]
但如果你想使用 RabbitMQ,则可以编写以下命令:
pip install dramatiq[rabbitmq]
参考资料
软件测试精品书籍文档下载持续更新 https://github.com/china-testing/python-testing-examples 请点赞,谢谢!
本文涉及的python测试开发库
谢谢点赞! https://github.com/china-testing/python_cn_resouce
python精品书籍下载
https://github.com/china-testing/python_cn_resouce/blob/main/python_good_books.md
Linux精品书籍下载 https://www.cnblogs.com/testing-/p/17438558.html
5.3.2 开始使用 Dramatiq
现在我们已经安装了 Dramatiq 和所有其他必要的依赖项,可以开始使用这个库了。
在 Dramatiq 中,任务被角色(actors)所取代,角色只是包含代码的函数,由 Redis 创建的 Worker 执行。Redis 将负责把行动者(任务)的活动封装到消息中,并将消息分派给各个等待的 Worker。
请按照以下步骤操作:
首先,我们实现作为服务器的代码,并将其保存为 dramaserver.py:
import dramatiq
import time
@dramatiq.actor
def wait(t,n):
time.sleep(t)
print(“I am the actor %s and I will wait for %s secs” %(n,t))
#return “I waited for {0} secs”.format(t)
从代码中我们可以看到,我们首先导入了必要的库,包括 dramatiq。然后,我们将定义执行工作者活动的函数,这些函数在代码中可以通过 @dramatiq.actor 装饰器识别。在我们的示例中,我们将定义一个 wait() 函数作为 actor,它什么也不做,只是等待一定的秒数,然后在输出中打印一条关于预期秒数的信息。
现在,我们已经定义了一个非常简单的文件服务器示例,其中描述了分布式 Dramatiq 系统执行的服务(actor),我们只需在虚拟环境控制台运行以下命令,就能激活后者:
dramatiq dramaserver
输入文件服务器的名称,即可激活其中定义的角色。命令启动后,输出中将显示以下文本:
[2022-09-27 16:46:00,775] [PID 18332] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq ‘1.13.0’ is booting up.
[2022-09-27 16:46:00,685] [PID 7452] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,580] [PID 25400] [MainThread] [dramatiq.WorkerProcess(1)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,685] [PID 25192] [MainThread] [dramatiq.WorkerProcess(2)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,705] [PID 6828] [MainThread] [dramatiq.WorkerProcess(3)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,705] [PID 17300] [MainThread] [dramatiq.WorkerProcess(4)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,700] [PID 18248] [MainThread] [dramatiq.WorkerProcess(5)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,666] [PID 10216] [MainThread] [dramatiq.WorkerProcess(6)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,661] [PID 9372] [MainThread] [dramatiq.WorkerProcess(7)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,736] [PID 25952] [MainThread] [dramatiq.WorkerProcess(8)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,705] [PID 20300] [MainThread] [dramatiq.WorkerProcess(9)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,721] [PID 15876] [MainThread] [dramatiq.WorkerProcess(10)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:00,770] [PID 6644] [MainThread] [dramatiq.WorkerProcess(11)] [INFO] Worker process is ready for action.
[2022-09-27 16:46:01,270] [PID 6640] [MainThread] [dramatiq.ForkProcess(0)] [INFO] Fork process ‘dramatiq.middleware.prometheus:_run_exposition_server’ is ready for action.
我们可以看到,Dramatiq 分布式系统中已经创建了 12 个 Worker,分别对应 12 个不同的进程(PID 在文本中标出),它们将等待接收来自 Redis 的消息,其中包含要执行的行动者。
注:在 Windows 系统中,要关闭服务器,请按右上角的 X 按钮。窗口不会关闭,但 Dramatiq 上的所有活动服务都会关闭。
至此,我们实现了客户端的客户端,并将其保存为 dramaclient.py:
import dramatiq
import time
from dramaserver import wait
a = time.perf_counter()
[wait.send(10,i) for i in range(10)]
print(“End Program”)
我们只需打开另一个虚拟环境控制台,运行客户代码即可:
python dramaclient.py
执行代码后,您将立即在控制台上看到程序结束的文本,并提示您可以执行新命令:End Program
而在服务器控制台,大约 10 秒钟后,我们将同时获得类似下面的输出:
I am the actor 10 and I will wait for 0 secs
I am the actor 10 and I will wait for 7 secs
I am the actor 10 and I will wait for 3 secs
I am the actor 10 and I will wait for 9 secs
I am the actor 10 and I will wait for 1 secs
I am the actor 10 and I will wait for 6 secs
I am the actor 10 and I will wait for 4 secs
I am the actor 10 and I will wait for 2 secs
I am the actor 10 and I will wait for 5 secs
I am the actor 10 and I will wait for 8 secs
在前面的代码中,我们使用了 for inline 命令来同时启动多个角色。但这种方法只能在有限的情况下使用,因为在这些情况下,actors 都是相同的,而且其中一个参数往往是迭代器本身的值。并行启动多个角色的更普遍方法是分组。
让我们重写之前的客户端代码如下:
import dramatiq
import time
from dramaserver import wait
from dramatiq import group
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
broker = RedisBroker(host=”localhost”)
dramatiq.set_broker(broker)
result_backend = RedisBackend(host=”localhost”)
broker.add_middleware(Results(backend=result_backend))
g = group([
wait.message(10,’A’),
wait.message(5,’B’),
wait.message(4,’C’),
wait.message(7,’D’),
]).run()
for res in g.get_results(block=True, timeout=12000):
print(res)
print(“End Program”)
正如我们所看到的,group() 函数接受一个调用行为体的列表,这些行为体之间也可以是不同的。但这次,actors 将调用 message() 方法来发送要使用的参数。
执行后,服务器控制台的输出结果类似于下面的内容:
I am the actor C and I will wait for 4 secs
I am the actor B and I will wait for 5 secs
I am the actor D and I will wait for 7 secs
I am the actor A and I will wait for 10 secs
这次的执行时间不同,因此各行将在不同的时间按执行顺序出现。
5.3.3 结果管理
到目前为止,我们已经管理了不产生结果(即返回值)的演员。实际上,Dramatiq 需要对这些角色进行特殊管理。这些返回值也必须包含在消息中,并被收集到结果后台,这与我们在 Celery 中看到的情况非常相似。在这里,您也可以选择使用 Redis 还是 RabbitMQ 作为结果后端的应用程序。
为了添加所有必要的设置并激活代理和结果后端服务,我们必须对之前的代码进行修改和添加,这一次我们将完成通过 Dramatiq 发布的实现的全貌。
至于管理服务器端的代码,我们将对其进行如下修改。
import dramatiq
import time
from dramatiq.brokers.redis import RedisBroker
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
broker = RedisBroker(host=”localhost”)
broker = RabbitmqBroker(host=”localhost”)
dramatiq.set_broker(broker)
result_backend = RedisBackend(host=”localhost”)
broker.add_middleware(Results(backend=result_backend))
@dramatiq.actor(store_results=True)
def wait(t,n):
time.sleep(t)
print(“I am the actor %s and I will wait for %s secs” %(n,t))
return “I waited for {0} secs”.format(t)
正如我们在代码中看到的,我们增加了很多内容。在第一部分,这次必须导入所有负责 Broker 和 Result Backend 服务的类。在我们的示例中,我们决定使用 Redis 来实现这两个功能。不过,我在与 RabbitMQ 相对应的类上留下了注释,以便为希望使用其他应用程序的用户提供参考。由于示例是在本地的同一台计算机上进行的,因此我输入了本地主机作为两个服务的主机。如果要在网络上运行,请将这些值替换为相应的 IP 地址。至于 actor,我们添加了一个返回值,它仍然是一个字符串。在这种情况下,我们的目的是从客户端在控制台上显示这个字符串。
我们可以看到,在客户端代码中也需要导入与客户端相同的经纪人和结果后台服务类。通过指定不同的 IP,这通常也有助于让客户端了解组成 Dramatiq 分布式系统的服务在哪里。
至于行动者,这次它们被聚集在一起,并在一定时间内将其结果保存在结果后台的内存中。通过 get_results()(获取结果)方法,可以从组中获取这些结果。默认情况下,结果在结果后台的停留时间为 10 秒,而在我们的示例中,我们的工作时间就是这些时间,因此最好将超时时间延长至 12 秒(12000 毫秒)。
在我们制作的 Dramatiq 系统中运行这两段代码,就能在客户端控制台上看到结果:
I waited for 10 secs
I waited for 5 secs
I waited for 4 secs
I waited for 7 secs
End Program
在服务器端控制台
I am the actor C and I will wait for 4 secs
I am the actor B and I will wait for 5 secs
I am the actor D and I will wait for 7 secs
I am the actor A and I will wait for 10 secs
5.4 SCOOP 库
另一个使用 Python 进行分布式计算的在线库是 Scalable Concurrent Operation in Python (SCOOP)。该项目仍处于测试阶段(目前为 0.7 版),由一个名为 scoop 的模块组成,其任务是将并发任务(此处称为期货)分配给分布在网络中不同机器上的工作进程:
SCOOP 开发人员为自己设定的主要目标是尽可能保持分布式并行编程管理的简单和透明。通过映射和缩减机制,SCOOP 能够在多个处理层管理不同的任务。然后,该模块将根据连接到系统的机器的物理潜力和在不同情况下应用的并行化方法,对如何分配任务的可能性进行物理考虑。
至于构成体系结构的元素,SCOOP 比 Celery 等分布式系统要简单得多,因为它只有 WORKERS(都是一样的)和一个 BROKER,
通信系统的核心要素是经纪人,他与所有独立的 Worker 交互,在它们之间调度信息。期货是在工作者元素中创建的,而不是通过集中序列化程序在经纪人中创建的。这使得架构更加可靠,并提高了性能。事实上,经纪人的主要工作是在 Worker 中的各个进程之间进行联网和 I/O 交换,响应时间相对较短。
5.4.1 安装 SCOOP
要安装 SCOOP,我们可以使用之前创建的虚拟环境,也可以专门创建一个新的虚拟环境。激活虚拟环境后,我们打开命令行控制台,使用 pip 安装 SCOOP:
pip install SCOOP
要直接测试 SCOOP,我们在文本编辑器中编写以下代码,然后将其保存为 scoopy.py,放在虚拟环境的工作区目录下:
from scoop import futures
def worker(value):
print(“I am the Worker %s” %value)
if name == “main“:
list(futures.map(worker, range(4)))
在运行代码之前,让我们先快速浏览一下。从 scoop 模块中导入 futures,它只不过是将由 broker 分配给 Worker 的任务池。代理通过 map() 方法创建任务,该方法与标准库的方法一样,接受一个可迭代参数作为第二个参数,在本例中是 range(4),它只不过是一个由四个元素组成的升序整数序列:(0,1,2,3)。map()函数必须作为参数传递给 list()函数,以便在所有 Worker 执行结束时实现同步。这样,程序将等待所有 Worker 的执行结果,然后再停止程序。
迭代器中的每一个元素都会传递给 map() 的第一个参数,即 worker,这只不过是代码中定义的 worker() 函数的名称。该函数包含了分配给每个 Worker 的操作。因此,在我们的示例中,我们将有四个任务(可迭代元素的个数)分配给可用的 worker,每个 worker 都将打印一个字符串,并在其中添加自己的标识号,该标识号与可迭代元素的值完全一致。
要运行使用 SCOOP 的代码,需要在 Python 代码执行命令中指定 -m scoop 选项。因此,让我们写
python -m scoopy.py
执行代码后,控制台输出将与下面类似:
[2022-09-26 15:01:08,933] launcher INFO SCOOP 0.7 2.0 on win32 using Python 3.10.4 | packaged by conda-forge | (main, Mar 30 2022, 08:38:02) [MSC v.1916 64 bit (AMD64)], API: 1013
[2022-09-26 15:01:08,933] launcher INFO Deploying 12 worker(s) over 1 host(s).
[2022-09-26 15:01:08,933] launcher INFO Worker distribution:
[2022-09-26 15:01:08,933] launcher INFO 127.0.0.1: 11 + origin
Launching 12 worker(s) using an unknown shell.
I am the Worker 0
I am the Worker 1
I am the Worker 3
I am the Worker 2
[2022-09-26 15:01:09,934] launcher (127.0.0.1:64064) INFO Root process is done.
[2022-09-26 15:01:09,934] launcher (127.0.0.1:64064) INFO Finished cleaning spawned subprocesses.
从执行结果可以看出,SCOOP 在我的计算机上自动创建了 12 个 Worker(11 + origin)。这个值是 SCOOP 根据执行程序的机器的特性自动设置的。我们可以从各个 worker() 函数打印的输出中看到,这四个任务是并发执行的,执行顺序是随机的。
让我们将前面的示例变得更加复杂,在调用的函数中添加一个带有结果的数学计算,我们将其称为 func():
from scoop import futures
import math
import numpy as np
def func(value):
result = math.sqrt(value)
print(“The value %s and the elaboration is %s” %(value, result))
return result
if name == “main“:
data = np.array([10,3,6,1,4,8,25,9])
results = list(futures.map(func, data))
for result in results:
print(“This is the result: %s” %result)
从代码中我们可以看到,现在由 Worker 处理的 func() 函数除了写入字符串作为输出外,还会将结果返回给代理,然后将其传递给 map() 函数的返回值,并通过 list() 保存在结果变量中。我们还用一个包含 8 个元素的 NumPy 数组替换了有序序列 range(4)。程序结束时,结果将打印在输出中。
让我们将代码保存到一个文件中,并将其称为 scoopy2.py。在上一个示例中,我们看到 SCOOP 自动创建了 12 个 Worker。通过在 python 命令中添加 -n 选项,可以在每条命令行中设置这一数量。例如,如果我们想使用 4 个 Worker,则必须在虚拟环境控制台运行以下命令:
python -m scoop -n 4 scoopy2.py
运行代码将产生类似下面的结果:
[2022-09-26 15:12:25,473] launcher INFO SCOOP 0.7 2.0 on win32 using Python 3.10.4 | packaged by conda-forge | (main, Mar 30 2022, 08:38:02) [MSC v.1916 64 bit (AMD64)], API: 1013
[2022-09-26 15:12:25,473] launcher INFO Deploying 4 worker(s) over 1 host(s).
[2022-09-26 15:12:25,489] launcher INFO Worker distribution:
[2022-09-26 15:12:25,489] launcher INFO 127.0.0.1: 3 + origin
Launching 4 worker(s) using an unknown shell.
The value 10 and the elaboration is 3.1622776601683795
The value 3 and the elaboration is 1.7320508075688772
The value 6 and the elaboration is 2.449489742783178
The value 1 and the elaboration is 1.0
The value 4 and the elaboration is 2.0
The value 8 and the elaboration is 2.8284271247461903
The value 25 and the elaboration is 5.0
The value 9 and the elaboration is 3.0
This is the result: 3.1622776601683795
This is the result: 1.7320508075688772
This is the result: 2.449489742783178
This is the result: 1.0
This is the result: 2.0
This is the result: 2.8284271247461903
This is the result: 5.0
This is the result: 3.0
[2022-09-26 15:12:26,474] launcher (127.0.0.1:64354) INFO Root process is done.
[2022-09-26 15:12:26,474] launcher (127.0.0.1:64354) INFO Finished cleaning spawned subprocesses.
从结果中我们可以看到,各种任务的执行顺序仍然是随机的。但在结果输出中,需要注意的是,list(map()) 将按照 NumPy 数组中定义的顺序返回所有结果。因此,在这一阶段有一个同步阶段,即等待所有结果的结果值 result,也就是返回的变量。
既然我们已经了解了 SCOOP 架构是如何通过映射来管理任务的,那么让我们进一步增加还原步骤。在前面的示例中,我们看到了如何通过一个包含 n 个元素的可迭代器,得到一个包含 n 个结果的可迭代器。现在,我们可以增加一个还原阶段,为结果添加一个聚合还原函数。例如,我们可以用一个还原函数(如 sum() 函数)来代替函数 list(),该函数用于同步不同工作进程的所有结果并将它们排序到一个列表中。该函数将汇总不同工作进程的所有结果。无论执行时间长短,它都会逐步累加所有结果,直到最后一个 Worker 结束,然后返回结果。我们将再次使用同步机制。
让我们以下面的方式修改之前的代码:
if name == “main”:
data = np.array([10,3,6,1,4,8,25,9])
result = sum(futures.map(func, data))
print(“This is the reduction result: %s” %result)
执行后,您将得到缩减的结果,即所有工作者的结果之和:
…
This is the reduction result: 21.172245335266624
…
在 SCOOP 中,还有一种方法包含了前面的两种操作,即映射和缩减:mapReduce()。我们继续修改之前的代码,导入操作符模块,它允许我们以方法的形式调用操作符。在我们的示例中,我们将使用 add() 进行加法运算。最后,我们将 map() 替换为 mapReduce(),并添加聚合方法 operator.add 作为第二个应用函数:
from scoop import futures
import math
import numpy as np
import random
def func(value):
result = math.sqrt(value)
print(“The value %s and the elaboration is %s” %(value, result))
return result
if name == “main“:
data = np.array([10,3,6,1,4,8,25,9])
result = sum(futures.map(func, data))
print(“This is the reduction result: %s” %result)
运行前面的代码,就会得到与前面相同的结果。如果将映射和还原函数串联起来,问题会变得更加复杂。
执行后得到如下结果:
[2022-09-26 16:09:50,661] launcher INFO Worker distribution:
[2022-09-26 16:09:50,661] launcher INFO 127.0.0.1: 3 + origin
Launching 4 worker(s) using an unknown shell.
The value 10 and the elaboration is 3.1622776601683795
The value 3 and the elaboration is 1.7320508075688772
The value 6 and the elaboration is 2.449489742783178
The value 1 and the elaboration is 1.0
The value 4 and the elaboration is 2.0
The value 8 and the elaboration is 2.8284271247461903
The value 25 and the elaboration is 5.0
The value 9 and the elaboration is 3.0
This is the reduction result: 2.375
[2022-09-26 16:09:52,039] launcher (127.0.0.1:65396) INFO Root process is done.
[2022-09-26 16:09:52,054] launcher (127.0.0.1:65396) INFO Finished cleaning spawned subprocesses.
我们可以看到,事情会逐渐变得复杂,但分布式计算的功能却越来越强大。
5.5 结论
本章介绍了分布式系统的概念,以及它们如何进入并行编程环境。此外,我们还特别讨论了 Celery 框架,它目前是用 Python 实现分布式系统的主要参考点。特别是,我们看到了并发任务计算方面最有用的功能。随后,我们还看到了该框架的两个可能替代方案,它们为我们的环境提供了类似的解决方案: Dramatiq 和 SCOOP。
声明:文中观点不代表本站立场。本文传送门:https://eyangzhen.com/424715.html