返回介绍

12.2 用 Jug 程序包把你的处理流程分解成几个任务

发布于 2024-01-30 22:34:09 字数 6328 浏览 0 评论 0 收藏 0

通常,我们有一个简单处理流程:先对初始数据进行预处理,计算特征,然后在生成的特征上调用一个机器学习算法。

Jug是一个程序包,它是由Luis PedrCoelho (本书的作者之一)开发的。它是开源的,适用于很多领域,但它是专为数据分析问题设计的。它能同时解决几个问题,如下所示。

它可以把结果记录在磁盘上(或一个数据库),这意味着,如果你让它计算一些曾经计算过的东西,那它可以直接从磁盘里读取结果。

它可以利用多核处理器,或者甚至一个集群里的多台主机。Jug在批处理计算环境中也工作得非常不错。批处理计算环境就是一个排队系统,如便携式批处理系统 (Portable Batch System,PBS)、负载共享系统 (Load Sharing Facility,LSF)或Oracle网格引擎 (Oracle Grid Engine,OGE,之前叫做Sun网格引擎,即Sun Grid Engine)。本章后面将会用到它,届时我们将构建在线集群并把作业分发给它们。

12.2.1 关于任务

任务是Jug的基本构件。一个任务 就是一个函数以及它的参数值,例如:

def double(x): return 2*x

一个任务可以是“用参数值3 调用double ,另一个任务可以是“用参数642.34 调用double ”。用Jug,我们可以按如下方式构建任务:

from jug import Task t1 = Task(double, 3) t2 = Task(double, 642.34)

把它保存在一个名为jugfile.py(这是一个正常的Python文件)的文件里。现在,我们运行jug execute 来执行任务。它是在命令行下执行的,而不是Python提示符!我们运行的是jug execute ,而不是Python的jugfile.py文件(它什么也没做)。

你可以从任务中得到一些反馈(Jug会告诉你,两个名为“double”的任务正在运行)。再次运行jug execute ,它会告诉你它并没做什么。它也并不需要做。在这种情况下,我们所得很少。但是如果任务需要花费很长时间来计算,那这个信息就会很有用处。

也许你已经注意到了,一个名为jugfile.jugdata的新目录出现在硬盘上,它包括一些命名怪异的文件。这就是记忆化缓存。如果你把它删除了,那jug execute 就会再次运行所有任务(两个任务)。

注意  区分纯函数(仅仅接受输入返回结果)和更一般的函数(可以进行一些操作,如读文件、写文件、获取全局变量、修改参数,或者其他编程语言允许的操作)通常都是有益处的。一些编程语言,如Haskell,甚至会在语法上区分纯函数和不纯的函数。

使用Jug,你的任务将不会是完全纯粹的。我们甚至推荐你在任务中读取数据或写下结果。然而,获取和修改全局变量却不会有好结果;这些任务可能是以任意顺序在不同处理器上执行的。不过全局常数是个例外,但它也可能会让记忆系统混淆(如果数值在不同次运行中改变了)。类似的,你也不能修改输入数据。Jug有一个调试模式(用jug execute-debug ),它会使你的计算变慢,但当你犯这类错误的时候,它会给出有用的错误信息。

前面这些代码可以工作,但它用起来有一些麻烦;你总要重复构造Task(function, argument) 。如果使用一点Python中的魔法,我们可以让代码更加自然:

from jug import TaskGenerator from time import sleep @TaskGenerator def double(x): sleep(4) return 2*x @TaskGenerator def add(a, b): return a + b @TaskGenerator def print_final_result(oname, value): with open(oname, 'w') as output: print >>output, "Final result:", value y = double(2) z = double(y) y2 = double(7) z2 = double(y2) print_final_result('output.txt', add(z,z2))

除了使用TaskGenerator ,前面这段代码就是标准的Python代码。然而,使用TaskGenerator ,它实际上会创建了一系列任务,这样就可以利用多处理器来运行任务了。在后台,修饰器将你的函数转换形式,使得它们实际上并没有被执行,而是创建了一个任务。我们还可以利用“我们能把任务传递给其他任务”这个事实,但这样会导致依赖关系的出现。

你可能已经注意到,我们在前面的代码中加入了一些sleep(4) 的调用。它模拟了长时间运算的运行状态。否则,这段代码会执行得很快,没有地方需要使用多处理器。

我们从运行jug status 开始:

现在我们同时开启两个进程(在后台):

jug execute &
jug execute &

我们现在再次运行jug status :

我们可以看到,两个初始的double操作正在同时运行。大约8秒之后,整个过程将会结束,运行结果会写入output.txt文件。

顺便说一下,如果执行的文件不是jugfile.py,那么你需要在命令行中明确地指定:

jug execute MYFILE.py

这是不使用jugfile.py这个名字的唯一缺点。

12.2.2 复用部分结果

例如,我们想要加入一个新特征(或者一组特征)。如我们在第10章中所看到的那样,可以通过修改计算代码很容易地达到这个目的。但是,这意味着需要重新计算所有特征。这是一种浪费,特别是在我们希望快速测试新特征和新技术的时候:

@TaskGenerator def new_features(im): import mahotas as mh im = mh.imread(fname, as_grey=1) es = mh.sobel(im, just_filter=1) return np.array([np.dot(es.ravel(), es.ravel())]) hfeatures = as_array([hfeature(f) for f in filenames]) efeatures = as_array([new_feature(f) for f in filenames]) features = Task(np.hstack, [hfeatures, efeatures]) # 学习代码…

现在你再运行一次jug execute 。新特征将会计算出来,而老特征会从缓存中读取出来。逻辑回归代码也会再运行一次,因为它的结果取决于所有特征,而这些特征现在已经不一样了。

这就是Jug非常强悍的地方;它可以确保在不浪费计算资源的情况下,帮我们获得想要的结果。

12.2.3 幕后的工作原理

Jug是怎样工作的?在最基本的层面上,它非常简单;一个任务就是一个函数加上它的参数。它的参数可能是一些数值,也可能是其他任务。如果一个任务包含另一个任务,那么这两个任务之间就有了依赖关系(在第一个任务得到结果之前,第二个无法运行)。

基于此,Jug对每一个任务都递归地计算一个散列函数。散列值就是对整个计算进行的编码。当你运行jug execute 的时候,会有一个小循环,如下面的代码片段所示:

for t in alltasks: if t.has_not_run() and not backend_has_value(t.hash()): value = t.execute() save_to_backend(value, key=t.hash())

由于加锁机制的问题,真实的循环要比现在复杂得多,但基本理念和前面那段代码所示的一样。

默认的后端会把文件写到磁盘里(在一个名为jugfile.jugdata/的有趣目录里)。我们也可以使用另一个采用了Redis数据库的后端。通过适当的加锁机制,它还允许多个处理器同时执行任务;这些处理器会独立地看待所有任务,并运行尚未被执行的任务,然后把结果写回共享后端。这个过程可以在单机上运行,也可以在多台主机上运行,只要机器可以访问相同的后端。(例如,使用网络磁盘或者Redis数据库。)本章后面,我们将会探讨计算机集群。但现在,让我们先把注意力集中在多核处理器上。

你还可以了解到为什么要记录中间结果。如果某个任务的结果在后端已经有了,那么这个任务就不会再次执行。另一方面,如果你对任务做了改动,即使变动很小(改变了1个参数),那它的散列值也会变化。因此,这个任务会重新计算。此外,所有依赖于它的其他任务,散列值也会相应改变,它们也会重新计算。

12.2.4 用Jug分析数据

Jug是一个通用的框架,但在理想情况下,它适用于中等规模的数据分析。在开发自己的分析流程时,你最好把中间结果保存下来。如果你之前已经做过预处理,而这个步骤只改变了你所计算的特征,那么你肯定不愿意再进行一遍预处理。如果已经计算出了特征,但想要把一些新特征融合进来,那么你也不愿意重新计算一遍其他特征。

Jug是特地为numpy 数组而优化过的。所以,无论何时任务返回或接收numpy 数组,你都可以利用到这种优化。Jug是这个协同工作的生态系统中的一部分。

现在回顾一下第10章,尤其是其中如何计算图像特征那部分。相信你一定还记得,当时我们读取了图像文件,计算了特征,把特征组合在一起,进行归一化,最后学习了如何创建分类器。接下来,我们重新做一遍,不过,这一次使用的是Jug。这一版的优点在于,我们能够在不重新计算所有原有特征的情况下增加一些新特征。

我们从引入一些程序库开始:

from jug import TaskGenerator

现在定义一个任务生成器,来计算特征:

@TaskGenerator def hfeatures(fname): import mahotas as mh import numpy as np im = mh.imread(fname, as_grey=1) im = mh.stretch(im) h = mh.features.haralick(im) return np.hstack([h.ptp(0), h.mean(0)])

注意,我们在函数里面只引入了numpy 和mahotas 。这是一个小优化;用这种方式,只有在任务运行的时候模块才会加载。现在我们设置图像文件名,如下所示:

filenames = glob('dataset/*.jpg')

我们可以把TaskGenerator 应用于任何函数,甚至是在那些并非我们所写的函数里,例如numpy.array :

import numpy as np as_array = TaskGenerator(np.array) # 计算所有特征 features = as_array([hfeature(f) for f in filenames]) # 获取标签数组 labels = map(label_for, f) res = perform_cross_validation(features, labels) @TaskGenerator def write_result(ofname, value): with open(ofname, 'w') as out: print >>out, "Result is:", value write_result('output.txt', res)

使用Jug的一个很小的不便之处在于,我们必须像前面这个例子那样,把函数的结果输出到文件。这是享受Jug额外方便时的一点小代价。

注意  本章并没有涉及Jug的所有特性,但这里有一个总结,是关于我们在正文里没有涉及但非常有意思的一些特性。

jug invalidate  这个特性是说,一个给定函数里的所有结果,都应当被看作无效结果,需要重新计算。那些依赖于无效结果的下游计算也要重新计算。

jug status --cache  如果jug status 耗费的时间太长,可以用--cache 标志对状态进行缓存,使之加速。注意,它并不能检测到jugfile.py的任何改动,但你可以一直使用--cache --clear 来删除缓存并重新启动。

jug clearnup  这个特性会把记忆缓存中的所有额外文件都删掉。这是一个垃圾回收操作。

还有一些其他的高级特性,例如它允许你查看jugfile.py里计算过的数值。你可以读一下Jug文档中关于“barriers”的使用说明(线上地址 http://jug.rtfd.org )。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文