- 内容提要
- 前言
- 作者简介
- 封面简介
- 第1章 理解高性能 Python
- 第2章 通过性能分析找到瓶颈
- 2.1 高效地分析性能
- 2.2 Julia 集合的介绍
- 2.3 计算完整的 Julia 集合
- 2.4 计时的简单方法——打印和修饰
- 2.5 用 UNIX 的 time 命令进行简单的计时
- 2.6 使用 cProfile 模块
- 2.7 用 runsnakerun 对 cProfile 的输出进行可视化
- 2.8 用 line_profiler 进行逐行分析
- 2.9 用 memory_profiler 诊断内存的用量
- 2.10 用 heapy 调查堆上的对象
- 2.11 用 dowser 实时画出变量的实例
- 2.12 用 dis 模块检查 CPython 字节码
- 2.13 在优化期间进行单元测试保持代码的正确性
- 2.14 确保性能分析成功的策略
- 2.15 小结
- 第3章 列表和元组
- 第4章 字典和集合
- 第5章 迭代器和生成器
- 第6章 矩阵和矢量计算
- 第7章 编译成 C
- 第8章 并发
- 第9章 multiprocessing 模块
- 第10章 集群和工作队列
- 第11章 使用更少的 RAM
- 第12章 现场教训
5.2 生成器的延迟估值
之前提到,生成器之所以能够节约我们的内存是因为它只处理当前感兴趣的值。在我们计算的任意点,我们都只能访问当前的值,而无法访问数列中的其他元素(这种算法我们通常称为“单通”或“在线”)。有时候这会令生成器难以被使用,不过有很多模块和函数可以帮助解决这一问题。
我们主要关注标准库中的itertools库。它提供了Python内建函数map、reduce、filter和zip的生成器版本(在itertools中分别是imap、ireduce、ifilter和izip),以及其他很多有用的函数。特别值得注意的有:
islice
允许对一个无穷生成器进行切片。
chain
将多个生成器链接到一起。
takewhile
给生成器添加一个终止条件。
cycle
通过不断重复将一个有穷生成器变成无穷。
让我们创建一个使用生成器来分析大数据集的例子。假设我们现在有一个分析函数用于处理时间点数据,数据每秒会产生一个,对于过去的20年——我们有631 152 000个数据点!该数据被保存在一个文件中,每秒生成一行,我们无法将整个数据集读入内存。如果我们想要进行一些简单的异常检测,我们可以使用生成器而无须分配任何列表!
我们需要解决的问题是:对于一个格式为“timestamp, value”的数据文件,需要找到所有含有异常值的日期,比该日均值超出3倍标准差之外的数字被视为异常值。我们首先写读取文件的代码,文件以一行接着一行的方式读取,然后将每一行的值输出为一个Python对象。我们还会创建一个read_fake_data生成器用于生成伪造数据来测试我们的算法。对于这个函数,我们依然会提供一个filename参数,来让它具有和read_data相同的函数签名,不过该参数会被丢弃。这两个函数,如例5-2所示,使用了延迟估值——我们仅当生成器的next()属性被调用时才会去读取文件的下一行,或生成新的伪造数据。
例5-2 延迟读取数据
from random import normalvariate, rand from itertools import count def read_data(filename): with open(filename) as fd: for line in fd: data = line.strip().split(',') yield map(int, data) def read_fake_data(filename): for i in count(): sigma = rand() * 10 yield (i, normalvariate(0, sigma))
现在,我们可以使用itertools提供的groupby函数将同一天的timestamp分在一组(例5-3)。这个函数的输入参数是一组数据和一个用于分组的关键字函数。返回的则是一个生成器,生成的每一个值都是一个元组,元组内包含该组的关键字和该组所有元素的生成器。对于关键字函数,我们会创建一个lambda函数返回一个date对象。对于同一天的数据会产生相同的date对象,这样就能以天来分组。这个关键字函数可以是任何东西——我们可以以小时、年或数据值的某个属性来分组。唯一的限制是数据必须是连续的。也就是说,如果我们有一个输入是A A A A B B A A并以字母分组,我们就会得到3个组,分别是(A, [A, A, A, A])、(B, [B, B])以及(A, [A, A])。
例5-3 对我们的数据分组
from datetime import date from itertools import groupby def day_grouper(iterable): key = lambda (timestamp, value) : date.fromtimestamp(timestamp) return groupby(iterable, key)
现在来进行实际的异常检测。我们会遍历一天内的值并记录平均值和最大值。平均值的计算会使用一个在线的平均值和标准差算法。[1]保留最大值是因为它将是我们异常数据的最佳代表——如果最大值比平均值的3sigma大,那么我们就返回代表这一天的date对象。否则,我们返回False。不过,我们也可以直接终止函数(并返回None)。我们输出这些值是因为这个check_anomaly函数被定义为一个数据过滤器——一种对需要保留的数据返回True而对需要丢弃的数据返回False的函数。这让我们可以过滤原始的数据集并只保留符合我们条件的日子。check_anomaly函数如例5-4所示。
例5-4 基于生成器的异常检测
import math def check_anomaly((day, day_data)): # We find the mean, standard deviation, and maximum values for the day. # Using a single-pass mean/standard deviation algorithm allows us to only # read through the day's data once. n = 0 mean = 0 M2 = 0 max_value = None for timestamp, value in day_data: n += 1 delta = value - mean mean = mean + delta/n M2 += delta*(value - mean) max_value = max(max_value, value) variance = M2/(n - 1) standard_deviation = math.sqrt(variance) # Here is the actual check of whether that day's data is anomalous. If it # is, we return the value of the day; otherwise, we return false. if max_value > mean + 3 * standard_deviation: return day return False
这个函数一个看上去可能有点奇怪的地方是参数定义中额外的一组小括号。这不是打字错误,而是因为该函数的输入来自groupby生成器。记住groupby返回的元组会成为这个check_anomaly函数的参数。因此,我们必须进行元组展开来正确获取组的关键字和组的数据。由于我们使用了ifilter,另一种不需要在函数定义中进行元组展开的处理方式是定义istarfilter,类似于istarmap对imap做的处理那样(更多信息参见itertools文档)。
最后,我们可以将生成器链接来获得所有具有异常数据的日子(例5-5)。
例5-5 将我们的生成器链接起来
from itertools import ifilter, imap data = read_data(data_filename) data_day = day_grouper(data) anomalous_dates = ifilter(None, imap(check_anomaly, data_day)) <em>#</em>❶ first_anomalous_date, first_anomalous_data = anomalous_dates.next() print "The first anomalous date is: ", first_anomalous_date
❶ ifilter会移除所有不满足给定过滤器的元素。默认情况(第一个参数为None)下,ifilter会过滤掉所有被估值为False的元素。这样我们就不会包含那些check_anomaly认为不存在异常的日子。
这个方法非常简单地允许我们获得异常日子的列表而不需要读取整个数据集。需要注意的一件事是这段代码并没有真正进行任何计算,它只是为计算逻辑设置了一条流水线。在我们执行anomalous_dates.next()或以某种方式对anomalous_dates进行遍历之前,文件永远不会被读取。事实上,如果我们的整个数据集中有5个异常日子,而我们的代码在读取了第一个之后就停止,那么文件就只会被读到第一个日子的数据出现为止。这就叫延迟估值——只有被明确要求的计算才会被执行,如果出现了一个提前终止的条件,那么就可以大幅降低整体的运行时间。
用这种方式组织代码的另一个好处是它允许我们在不需要重写大部分代码的情况下轻松改换更复杂的计算逻辑。比如,如果我们想要一个日内移动窗口的分组而不是按日分组,我们可以简单写一个新的day_grouper来替换:
from datetime import datetime def rolling_window_grouper(data, window_size=3600): window = tuple(islice(data, 0, window_size)) while True: current_datetime = datetime.fromtimestamp(window[0][0]) yield (current_datetime, window) window = window[1:] + (data.next(),)
现在,我们只需用这个rolling_window_grouper简单替换例5-5中的day_grouper就能获得我们想要的结果。使用这种模式,我们还能看到这两个方法都具有十分清晰的内存保证——它将仅保存窗口内的数据作为其状态(两个方法分别需要保存一整日的数据或3600个数据点)。如果就连数据集的这部分采样都依然无法全部放入内存,那么我们还可以通过多次打开文件并使用不同的文件描述符来指向我们实际需要的数据(或使用linecache模块)来解决这个问题。
最后一点:在rolling_window_grouper函数中,我们对window列表进行了很多pop和append操作。我们可以使用collections模块的deque对象来大大优化这部分代码。这个对象可以在列表的头部和尾部均提供O(1)的添加和删除操作(传统的列表只能对列表尾部提供O(1)的添加删除操作,而对列表的头部进行同样的操作则是O(n))。使用deque对象,我们可以使用append将新数据添加到列表的右侧(或者说尾部),并使用deque.popleft()来删除列表左侧(或者说头部)的数据而无须分配更多空间或进行耗时的O(n)操作。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论