- 内容提要
- 前言
- 作者简介
- 封面简介
- 第1章 理解高性能 Python
- 第2章 通过性能分析找到瓶颈
- 2.1 高效地分析性能
- 2.2 Julia 集合的介绍
- 2.3 计算完整的 Julia 集合
- 2.4 计时的简单方法——打印和修饰
- 2.5 用 UNIX 的 time 命令进行简单的计时
- 2.6 使用 cProfile 模块
- 2.7 用 runsnakerun 对 cProfile 的输出进行可视化
- 2.8 用 line_profiler 进行逐行分析
- 2.9 用 memory_profiler 诊断内存的用量
- 2.10 用 heapy 调查堆上的对象
- 2.11 用 dowser 实时画出变量的实例
- 2.12 用 dis 模块检查 CPython 字节码
- 2.13 在优化期间进行单元测试保持代码的正确性
- 2.14 确保性能分析成功的策略
- 2.15 小结
- 第3章 列表和元组
- 第4章 字典和集合
- 第5章 迭代器和生成器
- 第6章 矩阵和矢量计算
- 第7章 编译成 C
- 第8章 并发
- 第9章 multiprocessing 模块
- 第10章 集群和工作队列
- 第11章 使用更少的 RAM
- 第12章 现场教训
10.6 三个集群化解决方案
在下面几节中,我们介绍Parallel Python、IPython Parallel和NSQ。
Parallel Python有一个很熟悉multiprocessing的接口。把你的multiprocessing解决方案从一台单独的多核机器升级到多机器的设置就是个几分钟内的活。Parallel Python几乎没有依赖性,容易在一个本地集群中为研究工作做配置。它不是很强大还缺少通信机制,但是对于发送令人为难的并行任务来给一个小规模的局部集群来说,是很容易使用的。
IPython集群是很容易在一台多核机器上使用的。既然许多研究者使用IPython作为它们的 shell,使用它来做并行任务控制也是很自然的。构建一个集群需要一点系统管理知识,而且有一些依赖性(例如ZeroMQ),所以设置起来比Parallel Python稍微复杂一点。IPython Parallel的一个巨大胜利就是你能够如本地集群一样来使用远端集群这样一个事实(例如,使用亚马逊AWS和EC2)。
NSQ是一个可随时投入生产的队列系统,在类似Bitly那样的公司中所使用。它有持久性(所以如果机器挂了,任务就能够被其他的机器重新捡起)和强大的可扩展机制。它具有更强大的能力,对系统管理和工程技巧的需求更大一些。
10.6.1 为简单的本地集群使用Parallel Python模块
Parallel Python(pp)模块让本地的工作者集群能够使用一个类似于multiprocessing的接口。显而易见,那意味着把代码从使用map的multiprocessing转变为Parallel Python是很方便的。你能够轻易地使用一台机器或一个临时网络来运行代码。你能够使用pip install pp来安装它。
我们能够通过Parallel Python来使用蒙特卡罗方法,就如我们在9.3节中使用本地机器所做的那样——注意在例10-1中,这个接口与更早multiprocessing的例子是多么相似啊。我们在nbr_trials_per_process中创建了一个工作列表,并且把这些任务传递给4个本地进程。我们能够创建所需数量的工作项,当工作者变空闲时,它们会被消费。
例10-1 Parallel Python的本地例子
... import pp NBR_ESTIMATES = 1e8 def calculate_pi(nbr_estimates): steps = xrange(int(nbr_estimates)) nbr_trials_in_unit_circle = 0 for step in steps: x = random.uniform(0, 1) y = random.uniform(0, 1) is_in_unit_circle = x * x + y * y <= 1.0 nbr_trials_in_unit_circle += is_in_unit_circle return nbr_trials_in_unit_circle if __name__ == "__main__": NBR_PROCESSES = 4 job_server = pp.Server(ncpus=NBR_PROCESSES) print "Starting pp with", job_server.get_ncpus(), "workers" nbr_trials_per_process = [NBR_ESTIMATES] * NBR_PROCESSES jobs = [] for input_args in nbr_trials_per_process: job = job_server.submit(calculate_pi, (input_args,), (), ("random",)) jobs.append(job) # each job blocks until the result is ready nbr_in_unit_circles = [job() for job in jobs] print "Amount of work:", sum(nbr_trials_per_process) print sum(nbr_in_unit_circles) * 4 / NBR_ESTIMATES / NBR_PROCESSES
在例10-2中,我们拓展了例子——这次我们需要1024个任务做100000000次估算,每一次用动态配置的集群。在远端机器上,我们能够运行python ppserver.py –w 4 –a –d,远端服务器将用4个处理器(在Ian的笔记本电脑上默认会是8个,但是我们不想使用4个超线程,所以我们选择了4个CPU),使用自动连接和调试日志。调试日志在屏幕上打印调试信息,对于检查工作是否已经接收到来说,这是有用的。自动连接标记意味着我们不必声明IP地址,我们让pp自己广播并连接到服务器上。
例10-2 在集群上的Parallel Python
... NBR_JOBS = 1024 NBR_LOCAL_CPUS = 4 ppservers = ("*",) # set IP list to be autodiscovered job_server = pp.Server(ppservers=ppservers, ncpus=NBR_LOCAL_CPUS) print "Starting pp with", job_server.get_ncpus(), "local workers" nbr_trials_per_process = [NBR_ESTIMATES] * NBR_JOBS jobs = [] for input_args in nbr_trials_per_process: job = job_server.submit(calculate_pi, (input_args,), (), ("random",)) jobs.append(job) ...
使用第二台强力的笔记本电脑来运行,计算时间大概减半了。另一方面,一台具有单CPU的老式MacBook几乎没有帮助——它通常以如此慢的速度计算其中一项任务,导致快速的笔记本电脑空闲下来,没有更多的工作来运行,所以整体完成时间比只使用一台快速的笔记本电脑更长久。
这是一个很有用的方法来开始为轻量级的计算任务构建一个临时的集群。你可能不想要在生产环境中使用它(Celery或者GearMan可能是一个更好的选择),但是对于研究目的和易扩展性来说,当知悉一个相关的问题时,它就会让你快速取胜。
pp无法帮助来分发代码或静态数据给远端的机器,你不得不移动外部库(例如,你可能已经编译成一个静态库的任何东西)到远端机器,并且提供任何的共享数据。它能够序列化(pickle)要运行的代码,处理额外的导入以及你从控制进程所提供的数据。
10.6.2 使用IPython Parallel来支持研究
对IPython集群的支持通过ipcluster到来了。IPython成为了一个本地和远程处理引擎的接口,数据能够在引擎之间被推送,任务能够被推送到远端机器上。远程调试是有可能的,对消息传递接口(MPI)的支持是可选的。相同的通信机制让IPython Notebook接口变得强大。
这对研究设置来说意义巨大——你能够把任务推送到一个在本地集群中的机器,做交互,如果有问题就调试,把数据推送到机器中,并把结果收集回来,所有这一切都以交互的方式来进行。也要注意PyPy运行IPython和IPython Parallel。这个组合可能是非常强大的(如果你不使用numpy)。
在幕后,ZeroMQ被用来作为消息中间件,所以你需要安装它。如果你在局域网中构建集群,你可以避免使用SSH身份验证。如果你需要一定的安全性,那么它完全支持SSH,但却让配置变得更加复杂一点——从一个可信的局域网上开始,接着当你知悉每个组件如何工作时再扩建。
项目被拆分成4个组件。引擎是一个运行代码的同步Python解释器。你会运行一组引擎来开启并行计算。控制器提供了引擎的接口,它负责任务分发,并提供了一个直接接口和一个负载均衡接口来提供任务调度。一个中心枢纽用来跟踪引擎、调度器和客户端。调度器隐藏了引擎的同步本质,提供了一个异步接口。
在笔记本电脑上,我们用ipcluster start –n 4来启动4个引擎。在例10-3中,我们启动IPython并检查一个本地Client能否看到我们的4个本地引擎。我们能够使用c[:]来寻址所有4个引擎,我们把一个函数应用于每一个引擎——apply_ sync采用了一个可调用函数,这样我们就提供了一个返回字符串的零参数lambda表达式。4个引擎中的每一个会运行其中一个函数,并返回相同的结果。
例10-3 测试我们是否能用IPython看到本地引擎
In [1]: from IPython.parallel import Client In [2]: c = Client() In [3]: print c.ids [0, 1, 2, 3] In [4]: c[:].apply_sync(lambda:"Hello High Performance Pythonistas!") Out[4]: ['Hello High Performance Pythonistas!', 'Hello High Performance Pythonistas!', 'Hello High Performance Pythonistas!', 'Hello High Performance Pythonistas!']
创建我们的引擎之后,现在它们处于空白状态。如果我们在本地导入模块,它们不会被导入远程引擎。一个既在本地导入又在远端导入的干净的办法就是使用sync_import上下文管理器。在例10-4中,我们将在本地IPython和4个连接的引擎上导入os,接着在4个引擎上再次调用apply_sync来获取它们的PIDs。如果我们不做远程导入,我们会得到一个NameError,因为远程引擎不知道os模块。我们也能使用execue来在引擎上远程运行任何的Python命令。
例10-4 把模块导入远程引擎
In [5]: dview=c[:] # this is a direct view (not a load-balanced view) In [6]: with dview.sync_imports(): ....: import os ....: importing os on engine(s) In [7]: dview.apply_sync(lambda:os.getpid()) Out[7]: [15079, 15080, 15081, 15089] In [8]: dview.execute("import sys") # another way to execute commands remotely
你会想要把数据推送到引擎。在例10-5中显示的push命令让你发送字典项来加入每个引擎的全局名字空间。有相应的pull来获取这些项:你指定键,它就会从每个引擎返回对应的值。
例10-5 把共享数据推送到引擎
In [9]: dview.push({'shared_data':[50, 100]}) Out[9]: <AsyncResult: _push> In [10]: dview.apply_sync(lambda:len(shared_data)) Out[10]: [2, 2, 2, 2]
现在让我们给集群增加第二台机器。首先,我们将杀死之前所创建的ipengine引擎并结束掉IPython。我们会从一个干净的状态开始。你将需要第二台可用的机器,配置有SSH来允许你自动登入。
在例10-6中,我们会为集群创建一个新画像。一组配置文件被放入<HOME>/.ipthon/ profile_mycluster目录下。引擎默认被配置成只接受来自localhost的连接,而不接受来自外部设备的连接。编辑ipengine_config.py来配置HubFactory,以便接受外部的连接,保存起来,接着使用新画像来启动ipcluster。我们会回到4个本地引擎。
例10-6 创建一个接受公共连接的本地画像
$ ipython profile create mycluster --parallel $ gvim /home/ian/.ipython/profile_mycluster/ipengine_config.py # add "c.HubFactory.ip = '*'" near the top $ ipcluster start -n 4 --profile=mycluster
接下来我们需要把这个配置文件传递给我们的远程机器。在例10-7中,我们使用scp来把ipcontroller-engine.json(在我们启动ipcluster的时候所创建)拷贝到远程机器的.config/ipython/profile_default/security目录下。一旦拷贝完成,就在远程机器上运行ipengine。它会在默认目录下查找ipcontroller-engine.json,如果成功连接了,接着你就会看到类似在这里所显示的消息。
例10-7 把编辑好的画像拷贝到远程机器并做测试
# On the local machine $ scp /home/ian/.ipython/profile_mycluster/security/ipcontroller-engine.json ian@192.168.0.16:/home/ian/.config/ipython/profile_default/security/ # Now on the remote machine ian@ubuntu:~$ ipengine ...Using existing profile dir: u'/home/ian/.config/ipython/profile_default' ...Loading url_file u'/home/ian/.config/ipython/profile_default/security/ ipcontroller-engine.json' ...Registering with controller at tcp://192.168.0.128:35963 ...Starting to monitor the heartbeat signal from the hub every 3010 ms. ...Using existing profile dir: u'/home/ian/.config/ipython/profile_default' ...Completed registration with id 4
让我们测试配置。在例10-8中我们会使用新画像启动一个本地IPython shell。我们会获取5个客户端列表(4个本地,1个远程),接着我们会请求Python的版本信息——我们能看到在远程机器上,我们正使用着Anaconda发布版。我们只得到了一个额外的引擎,因为在这个案例中,远程机器是一个单核的MacBook。
例10-8 测试新机器是集群的一部分
$ ipython --profile=mycluster Python 2.7.5+ (default, Sep 19 2013, 13:48:49) Type "copyright", "credits" or "license" for more information. IPython 1.1.0—An enhanced Interactive Python. ... In [1]: from IPython.parallel import Client In [2]: c = Client() In [3]: c.ids Out[3]: [0, 1, 2, 3, 4] In [4]: dview=c[:] In [5]: with dview.sync_imports(): ...: import sys In [6]: dview.apply_sync(lambda:sys.version) Out[6]: ['2.7.5+ (default, Sep 19 2013, 13:48:49) \n[GCC 4.8.1]', '2.7.5+ (default, Sep 19 2013, 13:48:49) \n[GCC 4.8.1]', '2.7.5+ (default, Sep 19 2013, 13:48:49) \n[GCC 4.8.1]', '2.7.5+ (default, Sep 19 2013, 13:48:49) \n[GCC 4.8.1]', '2.7.6 |Anaconda 1.9.2 (64-bit)| (default, Jan 17 2014, 10:13:17) \n [GCC 4.1.2 20080704 (Red Hat 4.1.2-54)]']
让我们把这一切放在一起。在例10-9中,我们将使用5个引擎来估算pi,就如我们在10.6.1节中所做的那样。这次我们将使用@require装饰器来在引擎中导入random模块。我们使用一个直接的视图来把我们的工作发送到引擎上,这会阻塞在那里直到所有的结果返回过来。接着我们就像以前所做的那样来估算pi。
例10-9 使用我们的本地集群估算pi
from IPython.parallel import Client, require NBR_ESTIMATES = 1e8 @require('random') def calculate_pi(nbr_estimates): ... return nbr_trials_in_unit_circle if __name__ == "__main__": c = Client() nbr_engines = len(c.ids) print "We're using {} engines".format(nbr_engines) dview = c[:] nbr_in_unit_circles = dview.apply_sync(calculate_pi, NBR_ESTIMATES) print "Estimates made:", nbr_in_unit_circles # work using the engines only nbr_jobs = len(nbr_in_unit_circles) print sum(nbr_in_unit_circles) * 4 / NBR_ESTIMATES / nbr_jobs
IPython Parallel提供了比在这儿所展示的要多得多的功能。异步任务和在更大的输入区间上的映射当然是可能的。它也有一个CompositeError类,这是一个更高层次的异常,用来封装发生于多个引擎上的相同的异常(如果你部署了糟糕的代码,你不会接收到多个完全相同的异常!)。当你处理多个引擎时,这就是一个便利。
IPython Parallel的一个尤其强大的特性就是允许你使用更大的集群环境,包括超级计算机和类似于亚马逊EC2的云服务。为了进一步方便这种类型的集群开发,Anaconda发布版包含了对StarCluster的支持。Olivier Grisel在PyCon 2013上给出了一个使用scikit-learn来做高级机器学习的一本优秀的教材。在两个小时内,他演示了使用StarCluster在亚马逊的EC2临时实例上通过IPython Parallel来做机器学习。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论