MapReduce 和分布式文件系统
MapReduce 有点像 Unix 工具,但分布在数千台机器上。像 Unix 工具一样,它相当简单粗暴,但令人惊异地管用。一个 MapReduce 作业可以和一个 Unix 进程相类比:它接受一个或多个输入,并产生一个或多个输出。
和大多数 Unix 工具一样,运行 MapReduce 作业通常不会修改输入,除了生成输出外没有任何副作用。输出文件以连续的方式一次性写入(一旦写入文件,不会修改任何现有的文件部分)。
虽然 Unix 工具使用 stdin
和 stdout
作为输入和输出,但 MapReduce 作业在分布式文件系统上读写文件。在 Hadoop 的 Map-Reduce 实现中,该文件系统被称为 HDFS(Hadoop 分布式文件系统) ,一个 Google 文件系统(GFS)的开源实现【19】。
除 HDFS 外,还有各种其他分布式文件系统,如 GlusterFS 和 Quantcast File System(QFS)【20】。诸如 Amazon S3,Azure Blob 存储和 OpenStack Swift 【21】等对象存储服务在很多方面都是相似的iv 。在本章中,我们将主要使用 HDFS 作为示例,但是这些原则适用于任何分布式文件系统。
iv. 一个不同之处在于,对于 HDFS,可以将计算任务安排在存储特定文件副本的计算机上运行,而对象存储通常将存储和计算分开。如果网络带宽是一个瓶颈,从本地磁盘读取有性能优势。但是请注意,如果使用纠删码,则会丢失局部性,因为来自多台机器的数据必须进行合并以重建原始文件【20】。 ↩
与网络连接存储(NAS)和存储区域网络(SAN)架构的共享磁盘方法相比,HDFS 基于 无共享 原则(参见 第二部分前言 )。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和专用网络基础设施(如光纤通道)。而另一方面,无共享方法不需要特殊的硬件,只需要通过传统数据中心网络连接的计算机。
HDFS 包含在每台机器上运行的守护进程,对外暴露网络服务,允许其他节点访问存储在该机器上的文件(假设数据中心中的每台通用计算机都挂载着一些磁盘)。名为 NameNode 的中央服务器会跟踪哪个文件块存储在哪台机器上。因此,HDFS 在概念上创建了一个大型文件系统,可以使用所有运行有守护进程的机器的磁盘。
为了容忍机器和磁盘故障,文件块被复制到多台机器上。复制可能意味着多个机器上的相同数据的多个副本,如 第 5 章 中所述,或者诸如 Reed-Solomon 码这样的纠删码方案,它允许以比完全复制更低的存储开销以恢复丢失的数据【20,22】。这些技术与 RAID 相似,可以在连接到同一台机器的多个磁盘上提供冗余;区别在于在分布式文件系统中,文件访问和复制是在传统的数据中心网络上完成的,没有特殊的硬件。
HDFS 已经扩展的很不错了:在撰写本书时,最大的 HDFS 部署运行在上万台机器上,总存储容量达数百 PB【23】。如此大的规模已经变得可行,因为使用商品硬件和开源软件的 HDFS 上的数据存储和访问成本远低于专用存储设备上的同等容量【24】。
MapReduce 作业执行
MapReduce 是一个编程框架,你可以使用它编写代码来处理 HDFS 等分布式文件系统中的大型数据集。理解它的最简单方法是参考 简单日志分析 中的 Web 服务器日志分析示例。MapReduce 中的数据处理模式与此示例非常相似:
- 读取一组输入文件,并将其分解成 记录(records) 。在 Web 服务器日志示例中,每条记录都是日志中的一行(即
\n
是记录分隔符)。 - 调用 Mapper 函数,从每条输入记录中提取一对键值。在前面的例子中,Mapper 函数是
awk '{print $7}'
:它提取 URL($7
)作为关键字,并将值留空。 - 按键排序所有的键值对。在日志的例子中,这由第一个
sort
命令完成。 - 调用 Reducer 函数遍历排序后的键值对。如果同一个键出现多次,排序使它们在列表中相邻,所以很容易组合这些值而不必在内存中保留很多状态。在前面的例子中,Reducer 是由
uniq -c
命令实现的,该命令使用相同的键来统计相邻记录的数量。
这四个步骤可以作为一个 MapReduce 作业执行。步骤 2(Map)和 4(Reduce)是你编写自定义数据处理代码的地方。步骤 1(将文件分解成记录)由输入格式解析器处理。步骤 3 中的排序步骤隐含在 MapReduce 中 —— 你不必编写它,因为 Mapper 的输出始终在送往 Reducer 之前进行排序。
要创建 MapReduce 作业,你需要实现两个回调函数,Mapper 和 Reducer,其行为如下(参阅 MapReduce 查询 ):
Mapper
Mapper 会在每条输入记录上调用一次,其工作是从输入记录中提取键值。对于每个输入,它可以生成任意数量的键值对(包括 None)。它不会保留从一个输入记录到下一个记录的任何状态,因此每个记录都是独立处理的。
Reducer MapReduce 框架拉取由 Mapper 生成的键值对,收集属于同一个键的所有值,并使用在这组值列表上迭代调用 Reducer。 Reducer 可以产生输出记录(例如相同 URL 的出现次数)。
在 Web 服务器日志的例子中,我们在第 5 步中有第二个 sort
命令,它按请求数对 URL 进行排序。在 MapReduce 中,如果你需要第二个排序阶段,则可以通过编写第二个 MapReduce 作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来,Mapper 的作用是将数据放入一个适合排序的表单中,并且 Reducer 的作用是处理已排序的数据。
分布式执行 MapReduce
MapReduce 与 Unix 命令管道的主要区别在于,MapReduce 可以在多台机器上并行执行计算,而无需编写代码来显式处理并行问题。Mapper 和 Reducer 一次只能处理一条记录;它们不需要知道它们的输入来自哪里,或者输出去往什么地方,所以框架可以处理在机器之间移动数据的复杂性。
在分布式计算中可以使用标准的 Unix 工具作为 Mapper 和 Reducer【25】,但更常见的是,它们被实现为传统编程语言的函数。在 Hadoop MapReduce 中,Mapper 和 Reducer 都是实现特定接口的 Java 类。在 MongoDB 和 CouchDB 中,Mapper 和 Reducer 都是 JavaScript 函数(参阅 MapReduce 查询 )。
图 10-1 显示了 Hadoop MapReduce 作业中的数据流。其并行化基于分区(参见 第 6 章 ):作业的输入通常是 HDFS 中的一个目录,输入目录中的每个文件或文件块都被认为是一个单独的分区,可以单独处理 map 任务( 图 10-1 中的 m1,m2 和 m3 标记)。
每个输入文件的大小通常是数百兆字节。 MapReduce 调度器(图中未显示)试图在其中一台存储输入文件副本的机器上运行每个 Mapper,只要该机器有足够的备用 RAM 和 CPU 资源来运行 Mapper 任务【26】。这个原则被称为 将计算放在数据附近 【27】:它节省了通过网络复制输入文件的开销,减少网络负载并增加局部性。
图 10-1 具有三个 Mapper 和三个 Reducer 的 MapReduce 任务
在大多数情况下,应该在 Mapper 任务中运行的应用代码在将要运行它的机器上还不存在,所以 MapReduce 框架首先将代码(例如 Java 程序中的 JAR 文件)复制到适当的机器。然后启动 Map 任务并开始读取输入文件,一次将一条记录传入 Mapper 回调函数。Mapper 的输出由键值对组成。
计算的 Reduce 端也被分区。虽然 Map 任务的数量由输入文件块的数量决定,但 Reducer 的任务的数量是由作业作者配置的(它可以不同于 Map 任务的数量)。为了确保具有相同键的所有键值对最终落在相同的 Reducer 处,框架使用键的散列值来确定哪个 Reduce 任务应该接收到特定的键值对(参见 按键散列分区 ))。
键值对必须进行排序,但数据集可能太大,无法在单台机器上使用常规排序算法进行排序。相反,分类是分阶段进行的。首先每个 Map 任务都按照 Reducer 对输出进行分区。每个分区都被写入 Mapper 程序的本地磁盘,使用的技术与我们在 SSTables 与 LSM 树 中讨论的类似。
只要当 Mapper 读取完输入文件,并写完排序后的输出文件,MapReduce 调度器就会通知 Reducer 可以从该 Mapper 开始获取输出文件。Reducer 连接到每个 Mapper,并下载自己相应分区的有序键值对文件。按 Reducer 分区,排序,从 Mapper 向 Reducer 复制分区数据,这一整个过程被称为 混洗(shuffle) 【26】(一个容易混淆的术语 —— 不像洗牌,在 MapReduce 中的混洗没有随机性)。
Reduce 任务从 Mapper 获取文件,并将它们合并在一起,并保留有序特性。因此,如果不同的 Mapper 生成了键相同的记录,则在 Reducer 的输入中,这些记录将会相邻。
Reducer 调用时会收到一个键,和一个迭代器作为参数,迭代器会顺序地扫过所有具有该键的记录(因为在某些情况可能无法完全放入内存中)。Reducer 可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出记录会写入分布式文件系统上的文件中(通常是在跑 Reducer 的机器本地磁盘上留一份,并在其他机器上留几份副本)。
MapReduce 工作流
单个 MapReduce 作业可以解决的问题范围很有限。以日志分析为例,单个 MapReduce 作业可以确定每个 URL 的页面浏览次数,但无法确定最常见的 URL,因为这需要第二轮排序。
因此将 MapReduce 作业链接成为 工作流(workflow) 中是极为常见的,例如,一个作业的输出成为下一个作业的输入。 Hadoop Map-Reduce 框架对工作流没有特殊支持,所以这个链是通过目录名隐式实现的:第一个作业必须将其输出配置为 HDFS 中的指定目录,第二个作业必须将其输入配置为从同一个目录。从 MapReduce 框架的角度来看,这是是两个独立的作业。
因此,被链接的 MapReduce 作业并没有那么像 Unix 命令管道(它直接将一个进程的输出作为另一个进程的输入,仅用一个很小的内存缓冲区)。它更像是一系列命令,其中每个命令的输出写入临时文件,下一个命令从临时文件中读取。这种设计有利也有弊,我们将在 物化中间状态 中讨论。
只有当作业成功完成后,批处理作业的输出才会被视为有效的(MapReduce 会丢弃失败作业的部分输出)。因此,工作流中的一项作业只有在先前的作业 —— 即生产其输入的作业 —— 成功完成后才能开始。为了处理这些作业之间的依赖,有很多针对 Hadoop 的工作流调度器被开发出来,包括 Oozie,Azkaban,Luigi,Airflow 和 Pinball 【28】。
这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。在构建推荐系统时,由 50 到 100 个 MapReduce 作业组成的工作流是常见的【29】。而在大型组织中,许多不同的团队可能运行不同的作业来读取彼此的输出。工具支持对于管理这样复杂的数据流而言非常重要。
Hadoop 的各种高级工具(如 Pig 【30】,Hive 【31】,Cascading 【32】,Crunch 【33】和 FlumeJava 【34】)也能自动布线组装多个 MapReduce 阶段,生成合适的工作流。
Reduce 端连接与分组
我们在 第 2 章 中讨论了数据模型和查询语言的联接,但是我们还没有深入探讨连接是如何实现的。现在是我们再次捡起这条线索的时候了。
在许多数据集中,一条记录与另一条记录存在关联是很常见的:关系模型中的 外键 ,文档模型中的 文档引用 或图模型中的 边 。当你需要同时访问这一关联的两侧(持有引用的记录与被引用的记录)时,连接就是必须的。(包含引用的记录和被引用的记录),连接就是必需的。正如 第 2 章 所讨论的,非规范化可以减少对连接的需求,但通常无法将其完全移除v 。
v. 我们在本书中讨论的连接通常是等值连接,即最常见的连接类型,其中记录与其他记录在特定字段(例如 ID)中具有 相同值 相关联。有些数据库支持更通用的连接类型,例如使用小于运算符而不是等号运算符,但是我们没有地方来讲这些东西。 ↩
在数据库中,如果执行只涉及少量记录的查询,数据库通常会使用 索引 来快速定位感兴趣的记录(参阅 第 3 章 )。如果查询涉及到连接,则可能涉及到查找多个索引。然而 MapReduce 没有索引的概念 —— 至少在通常意义上没有。
当 MapReduce 作业被赋予一组文件作为输入时,它读取所有这些文件的全部内容;数据库会将这种操作称为 全表扫描 。如果你只想读取少量的记录,则全表扫描与索引查询相比,代价非常高昂。但是在分析查询中(参阅 事务处理或分析? ),通常需要计算大量记录的聚合。在这种情况下,特别是如果能在多台机器上并行处理时,扫描整个输入可能是相当合理的事情。
当我们在批处理的语境中讨论连接时,我们指的是在数据集中解析某种关联的全量存在。 例如我们假设一个作业是同时处理所有用户的数据,而非仅仅是为某个特定用户查找数据(而这能通过索引更高效地完成)。
示例:分析用户活动事件
图 10-2 给出了一个批处理作业中连接的典型例子。左侧是事件日志,描述登录用户在网站上做的事情(称为 活动事件(activity events) 或 点击流数据(clickstream data) ),右侧是用户数据库。 你可以将此示例看作是星型模式的一部分(参阅 星型和雪花型:分析的模式 ):事件日志是事实表,用户数据库是其中的一个维度。
图 10-2 用户行为日志与用户档案的连接
分析任务可能需要将用户活动与用户简档相关联:例如,如果档案包含用户的年龄或出生日期,系统就可以确定哪些页面更受哪些年龄段的用户欢迎。然而活动事件仅包含用户 ID,而没有包含完整的用户档案信息。在每个活动事件中嵌入这些档案信息很可能会非常浪费。因此,活动事件需要与用户档案数据库相连接。
实现这一连接的最简单方法是,逐个遍历活动事件,并为每个遇到的用户 ID 查询用户数据库(在远程服务器上)。这是可能的,但是它的性能可能会非常差:处理吞吐量将受限于受数据库服务器的往返时间,本地缓存的有效性很大程度上取决于数据的分布,并行运行大量查询可能会轻易压垮数据库【35】。
为了在批处理过程中实现良好的吞吐量,计算必须(尽可能)限于单台机器上进行。为待处理的每条记录发起随机访问的网络请求实在是太慢了。而且,查询远程数据库意味着批处理作业变为 非确定的(nondeterministic) ,因为远程数据库中的数据可能会改变。
因此,更好的方法是获取用户数据库的副本(例如,使用 ETL 进程从数据库备份中提取数据,参阅 数据仓库 ),并将它和用户行为日志放入同一个分布式文件系统中。然后你可以将用户数据库存储在 HDFS 中的一组文件中,而用户活动记录存储在另一组文件中,并能用 MapReduce 将所有相关记录集中到同一个地方进行高效处理。
排序合并连接
回想一下,Mapper 的目的是从每个输入记录中提取一对键值。在 图 10-2 的情况下,这个键就是用户 ID:一组 Mapper 会扫过活动事件(提取用户 ID 作为键,活动事件作为值),而另一组 Mapper 将会扫过用户数据库(提取用户 ID 作为键,用户的出生日期作为值)。这个过程如 图 10-3 所示。
图 10-3 在用户 ID 上进行的 Reduce 端连接。如果输入数据集分区为多个文件,则每个分区都会被多个 Mapper 并行处理
当 MapReduce 框架通过键对 Mapper 输出进行分区,然后对键值对进行排序时,效果是具有相同 ID 的所有活动事件和用户记录在 Reducer 输入中彼此相邻。 Map-Reduce 作业甚至可以也让这些记录排序,使 Reducer 总能先看到来自用户数据库的记录,紧接着是按时间戳顺序排序的活动事件 —— 这种技术被称为 二次排序(secondary sort) 【26】。
然后 Reducer 可以容易地执行实际的连接逻辑:每个用户 ID 都会被调用一次 Reducer 函数,且因为二次排序,第一个值应该是来自用户数据库的出生日期记录。 Reducer 将出生日期存储在局部变量中,然后使用相同的用户 ID 遍历活动事件,输出 已观看网址 和 观看者年龄 的结果对。随后的 Map-Reduce 作业可以计算每个 URL 的查看者年龄分布,并按年龄段进行聚集。
由于 Reducer 一次处理一个特定用户 ID 的所有记录,因此一次只需要将一条用户记录保存在内存中,而不需要通过网络发出任何请求。这个算法被称为 排序合并连接(sort-merge join) ,因为 Mapper 的输出是按键排序的,然后 Reducer 将来自连接两侧的有序记录列表合并在一起。
把相关数据放在一起
在排序合并连接中,Mapper 和排序过程确保了所有对特定用户 ID 执行连接操作的必须数据都被放在同一个地方:单次调用 Reducer 的地方。预先排好了所有需要的数据,Reducer 可以是相当简单的单线程代码,能够以高吞吐量和与低内存开销扫过这些记录。
这种架构可以看做,Mapper 将 消息 发送给 Reducer。当一个 Mapper 发出一个键值对时,这个键的作用就像值应该传递到的目标地址。即使键只是一个任意的字符串(不是像 IP 地址和端口号那样的实际的网络地址),它表现的就像一个地址:所有具有相同键的键值对将被传递到相同的目标(一次 Reduce 的调用)。
使用 MapReduce 编程模型,能将计算的物理网络通信层面(从正确的机器获取数据)从应用逻辑中剥离出来(获取数据后执行处理)。这种分离与数据库的典型用法形成了鲜明对比,从数据库中获取数据的请求经常出现在应用代码内部【36】。由于 MapReduce 能够处理所有的网络通信,因此它也避免了应用代码去担心部分故障,例如另一个节点的崩溃:MapReduce 在不影响应用逻辑的情况下能透明地重试失败的任务。
GROUP BY
除了连接之外, 把相关数据放在一起 的另一种常见模式是,按某个键对记录分组(如 SQL 中的 GROUP BY 子句)。所有带有相同键的记录构成一个组,而下一步往往是在每个组内进行某种聚合操作,例如:
- 统计每个组中记录的数量(例如在统计 PV 的例子中,在 SQL 中表示为
COUNT(*)
聚合) - 对某个特定字段求和(SQL 中的
SUM(fieldname)
) - 按某种分级函数取出排名前 k 条记录。
使用 MapReduce 实现这种分组操作的最简单方法是设置 Mapper,以便它们生成的键值对使用所需的分组键。然后分区和排序过程将所有具有相同分区键的记录导向同一个 Reducer。因此在 MapReduce 之上实现分组和连接看上去非常相似。
分组的另一个常见用途是整理特定用户会话的所有活动事件,以找出用户进行的一系列操作(称为 会话化(sessionization) 【37】)。例如,可以使用这种分析来确定显示新版网站的用户是否比那些显示旧版本(A/B 测试)的用户更有购买欲,或者计算某个营销活动是否值得。
如果你有多个 Web 服务器处理用户请求,则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。你可以通过使用会话 cookie,用户 ID 或类似的标识符作为分组键,以将特定用户的所有活动事件放在一起来实现会话化,与此同时,不同用户的事件仍然散步在不同的分区中。
处理倾斜
如果存在与单个键关联的大量数据,则 将具有相同键的所有记录放到相同的位置 这种模式就被破坏了。例如在社交网络中,大多数用户可能会与几百人有连接,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为 关键对象(linchpin object) 【38】或 热键(hot key) 。
在单个 Reducer 中收集与某个名流相关的所有活动(例如他们发布内容的回复)可能导致严重的倾斜(也称为 热点(hot spot) )—— 也就是说,一个 Reducer 必须比其他 Reducer 处理更多的记录(参见 负载倾斜与消除热点 )。由于 MapReduce 作业只有在所有 Mapper 和 Reducer 都完成时才完成,所有后续作业必须等待最慢的 Reducer 才能启动。
如果连接的输入存在热点键,可以使用一些算法进行补偿。例如,Pig 中的 倾斜连接(skewed join) 方法首先运行一个抽样作业来确定哪些键是热键【39】。连接实际执行时,Mapper 会将热键的关联记录 随机 (相对于传统 MapReduce 基于键散列的确定性方法)发送到几个 Reducer 之一。对于另外一侧的连接输入,与热键相关的记录需要被复制到所有处理该键的 Reducer 上【40】。
这种技术将处理热键的工作分散到多个 Reducer 上,这样可以使其更好地并行化,代价是需要将连接另一侧的输入记录复制到多个 Reducer 上。 Crunch 中的 分片连接(sharded join) 方法与之类似,但需要显式指定热键而不是使用采样作业。这种技术也非常类似于我们在 负载倾斜与消除热点 中讨论的技术,使用随机化来缓解分区数据库中的热点。
Hive 的偏斜连接优化采取了另一种方法。它需要在表格元数据中显式指定热键,并将与这些键相关的记录单独存放,与其它文件分开。当在该表上执行连接时,对于热键,它会使用 Map 端连接(参阅 下一节 )。
当按照热键进行分组并聚合时,可以将分组分两个阶段进行。第一个 MapReduce 阶段将记录发送到随机 Reducer,以便每个 Reducer 只对热键的子集执行分组,为每个键输出一个更紧凑的中间聚合结果。然后第二个 MapReduce 作业将所有来自第一阶段 Reducer 的中间聚合结果合并为每个键一个值。
Map 端连接
上一节描述的连接算法在 Reducer 中执行实际的连接逻辑,因此被称为 Reduce 端连接。Mapper 扮演着预处理输入数据的角色:从每个输入记录中提取键值,将键值对分配给 Reducer 分区,并按键排序。
Reduce 端方法的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper 都可以对其预处理以备连接。然而不利的一面是,排序,复制至 Reducer,以及合并 Reducer 输入,所有这些操作可能开销巨大。当数据通过 MapReduce 阶段时,数据可能需要落盘好几次,取决于可用的内存缓冲区【37】。
另一方面,如果你 能 对输入数据作出某些假设,则通过使用所谓的 Map 端连接来加快连接速度是可行的。这种方法使用了一个阉掉 Reduce 与排序的 MapReduce 作业,每个 Mapper 只是简单地从分布式文件系统中读取一个输入文件块,然后将输出文件写入文件系统,仅此而已。
广播散列连接
适用于执行 Map 端连接的最简单场景是大数据集与小数据集连接的情况。要点在于小数据集需要足够小,以便可以将其全部加载到每个 Mapper 的内存中。
例如,假设在 图 10-2 的情况下,用户数据库小到足以放进内存中。在这种情况下,当 Mapper 启动时,它可以首先将用户数据库从分布式文件系统读取到内存中的散列中。完成此操作后,Map 程序可以扫描用户活动事件,并简单地在散列表中查找每个事件的用户 IDvi 。
vi. 这个例子假定散列表中的每个键只有一个条目,这对用户数据库(用户 ID 唯一标识一个用户)可能是正确的。通常,哈希表可能需要包含具有相同键的多个条目,而连接运算符将对每个键输出所有的匹配。 ↩
参与连接的较大输入的每个文件块各有一个 Mapper(在 图 10-2 的例子中活动事件是较大的输入)。每个 Mapper 都会将较小输入整个加载到内存中。
这种简单有效的算法被称为 广播散列连接(broadcast hash join) : 广播 一词反映了这样一个事实,每个连接较大输入端分区的 Mapper 都会将较小输入端数据集整个读入内存中(所以较小输入实际上 广播 到较大数据的所有分区上), 散列 一词反映了它使用一个散列表。 Pig(名为 复制链接(replicated join) ),Hive( MapJoin ),Cascading 和 Crunch 支持这种连接。它也被诸如 Impala 的数据仓库查询引擎使用【41】。
除了将连接较小输入加载到内存散列表中,另一种方法是将较小输入存储在本地磁盘上的只读索引中【42】。索引中经常使用的部分将保留在操作系统的页面缓存中,因而这种方法可以提供与内存散列表几乎一样快的随机查找性能,但实际上并不需要数据集能放入内存中。
分区散列连接
如果 Map 端连接的输入以相同的方式进行分区,则散列连接方法可以独立应用于每个分区。在 图 10-2 的情况中,你可以根据用户 ID 的最后一位十进制数字来对活动事件和用户数据库进行分区(因此连接两侧各有 10 个分区)。例如,Mapper3 首先将所有具有以 3 结尾的 ID 的用户加载到散列表中,然后扫描 ID 为 3 的每个用户的所有活动事件。
如果分区正确无误,可以确定的是,所有你可能需要连接的记录都落在同一个编号的分区中。因此每个 Mapper 只需要从输入两端各读取一个分区就足够了。好处是每个 Mapper 都可以在内存散列表中少放点数据。
这种方法只有当连接两端输入有相同的分区数,且两侧的记录都是使用相同的键与相同的哈希函数做分区时才适用。如果输入是由之前执行过这种分组的 MapReduce 作业生成的,那么这可能是一个合理的假设。
分区散列连接在 Hive 中称为 Map 端桶连接(bucketed map joins)【37】 。
Map 端合并连接
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行 排序 ,则可适用另一种 Map 端联接的变体。在这种情况下,输入是否小到能放入内存并不重要,因为这时候 Mapper 同样可以执行归并操作(通常由 Reducer 执行)的归并操作:按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对。
如果能进行 Map 端合并连接,这通常意味着前一个 MapReduce 作业可能一开始就已经把输入数据做了分区并进行了排序。原则上这个连接就可以在前一个作业的 Reduce 阶段进行。但使用独立的仅 Map 作业有时也是合适的,例如,分好区且排好序的中间数据集可能还会用于其他目的。
MapReduce 工作流与 Map 端连接
当下游作业使用 MapReduce 连接的输出时,选择 Map 端连接或 Reduce 端连接会影响输出的结构。Reduce 端连接的输出是按照 连接键 进行分区和排序的,而 Map 端连接的输出则按照与较大输入相同的方式进行分区和排序(因为无论是使用分区连接还是广播连接,连接较大输入端的每个文件块都会启动一个 Map 任务)。
如前所述,Map 端连接也对输入数据集的大小,有序性和分区方式做出了更多假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据是按哪些键做的分区和排序,以及分区的数量。
在 Hadoop 生态系统中,这种关于数据集分区的元数据通常在 HCatalog 和 Hive Metastore 中维护【37】。
批处理工作流的输出
我们已经说了很多用于实现 MapReduce 工作流的算法,但却忽略了一个重要的问题:这些处理完成之后的最终结果是什么?我们最开始为什么要跑这些作业?
在数据库查询的场景中,我们将事务处理(OLTP)与分析两种目的区分开来(参阅 事务处理或分析? )。我们看到,OLTP 查询通常根据键查找少量记录,使用索引,并将其呈现给用户(比如在网页上)。另一方面,分析查询通常会扫描大量记录,执行分组与聚合,输出通常有着报告的形式:显示某个指标随时间变化的图表,或按照某种排位取前 10 项,或一些数字细化为子类。这种报告的消费者通常是需要做出商业决策的分析师或经理。
批处理放哪里合适?它不属于事务处理,也不是分析。它和分析比较接近,因为批处理通常会扫过输入数据集的绝大部分。然而 MapReduce 作业工作流与用于分析目的的 SQL 查询是不同的(参阅 Hadoop 与分布式数据库的对比 )。批处理过程的输出通常不是报表,而是一些其他类型的结构。
建立搜索索引
Google 最初使用 MapReduce 是为其搜索引擎建立索引,用了由 5 到 10 个 MapReduce 作业组成的工作流实现【1】。虽然 Google 后来也不仅仅是为这个目的而使用 MapReduce 【43】,但如果从构建搜索索引的角度来看,更能帮助理解 MapReduce。 (直至今日,Hadoop MapReduce 仍然是为 Lucene/Solr 构建索引的好方法【44】)
我们在 全文搜索和模糊索引 中简要地了解了 Lucene 这样的全文搜索索引是如何工作的:它是一个文件(关键词字典),你可以在其中高效地查找特定关键字,并找到包含该关键字的所有文档 ID 列表(文章列表)。这是一种非常简化的看法 —— 实际上,搜索索引需要各种额外数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等 —— 但这个原则是成立的。
如果需要对一组固定文档执行全文搜索,则批处理是一种构建索引的高效方法:Mapper 根据需要对文档集合进行分区,每个 Reducer 构建该分区的索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引(参阅 分区和二级索引 )并行处理效果拔群。
由于按关键字查询搜索索引是只读操作,因而这些索引文件一旦创建就是不可变的。
如果索引的文档集合发生更改,一种选择是定期重跑整个索引工作流,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法的计算成本可能会很高。但它的优点是索引过程很容易理解:文档进,索引出。
另一个选择是,可以增量建立索引。如 第 3 章 中讨论的,如果要在索引中添加,删除或更新文档,Lucene 会写新的段文件,并在后台异步合并压缩段文件。我们将在 第 11 章 中看到更多这种增量处理。
键值存储作为批处理输出
搜索索引只是批处理工作流可能输出的一个例子。批处理的另一个常见用途是构建机器学习系统,例如分类器(比如垃圾邮件过滤器,异常检测,图像识别)与推荐系统(例如,你可能认识的人,你可能感兴趣的产品或相关的搜索【29】)。
这些批处理作业的输出通常是某种数据库:例如,可以通过给定用户 ID 查询该用户推荐好友的数据库,或者可以通过产品 ID 查询相关产品的数据库【45】。
这些数据库需要被处理用户请求的 Web 应用所查询,而它们通常是独立于 Hadoop 基础设施的。那么批处理过程的输出如何回到 Web 应用可以查询的数据库中呢?
最直接的选择可能是,直接在 Mapper 或 Reducer 中使用你最爱数据库的客户端库,并从批处理作业直接写入数据库服务器,一次写入一条记录。它能工作(假设你的防火墙规则允许从你的 Hadoop 环境直接访问你的生产数据库),但这并不是一个好主意,出于以下几个原因:
- 正如前面在连接的上下文中讨论的那样,为每条记录发起一个网络请求,要比批处理任务的正常吞吐量慢几个数量级。即使客户端库支持批处理,性能也可能很差。
- MapReduce 作业经常并行运行许多任务。如果所有 Mapper 或 Reducer 都同时写入相同的输出数据库,并以批处理的预期速率工作,那么该数据库很可能被轻易压垮,其查询性能可能变差。这可能会导致系统其他部分的运行问题【35】。
- 通常情况下,MapReduce 为作业输出提供了一个干净利落的 全有或全无 保证:如果作业成功,则结果就是每个任务恰好执行一次所产生的输出,即使某些任务失败且必须一路重试。如果整个作业失败,则不会生成输出。然而从作业内部写入外部系统,会产生外部可见的副作用,这种副作用是不能以这种方式被隐藏的。因此,你不得不去操心部分完成的作业对其他系统可见的结果,并需要理解 Hadoop 任务尝试与预测执行的复杂性。
更好的解决方案是在批处理作业 内 创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录,就像上节中的搜索索引一样。这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。不少键值存储都支持在 MapReduce 作业中构建数据库文件,包括 Voldemort 【46】,Terrapin 【47】,ElephantDB 【48】和 HBase 批量加载【49】。
构建这些数据库文件是 MapReduce 的一种很好用法的使用方法:使用 Mapper 提取出键并按该键排序,现在已经是构建索引所必需的大量工作。由于这些键值存储大多都是只读的(文件只能由批处理作业一次性写入,然后就不可变),所以数据结构非常简单。比如它们就不需要 WAL(参阅 使 B 树可靠 )。
将数据加载到 Voldemort 时,服务器将继续用旧数据文件服务请求,同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成,服务器会自动将查询切换到新文件。如果在这个过程中出现任何问题,它可以轻易回滚至旧文件,因为它们仍然存在而且不可变【46】。
批处理输出的哲学
本章前面讨论过的 Unix 哲学( Unix 哲学 )鼓励以显式指明数据流的方式进行实验:程序读取输入并写入输出。在这一过程中,输入保持不变,任何先前的输出都被新输出完全替换,且没有其他副作用。这意味着你可以随心所欲地重新运行一个命令,略做改动或进行调试,而不会搅乱系统的状态。
MapReduce 作业的输出处理遵循同样的原理。通过将输入视为不可变且避免副作用(如写入外部数据库),批处理作业不仅实现了良好的性能,而且更容易维护:
- 如果在代码中引入了一个错误,而输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将重新被纠正。或者,甚至更简单,你可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的概念被称为 人类容错(human fault tolerance) 【50】)
- 由于回滚很容易,比起在错误意味着不可挽回的伤害的环境,功能开发进展能快很多。这种 最小化不可逆性(minimizing irreversibility) 的原则有利于敏捷软件开发【51】。
- 如果 Map 或 Reduce 任务失败,MapReduce 框架将自动重新调度,并在同样的输入上再次运行它。如果失败是由代码中的错误造成的,那么它会不断崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于临时问题导致的,那么故障就会被容忍。因为输入不可变,这种自动重试是安全的,而失败任务的输出会被 MapReduce 框架丢弃。
- 同一组文件可用作各种不同作业的输入,包括计算指标的监控作业可以评估作业的输出是否具有预期的性质(例如,将其与前一次运行的输出进行比较并测量差异) 。
- 与 Unix 工具类似,MapReduce 作业将逻辑与布线(配置输入和输出目录)分离,这使得关注点分离,可以重用代码:一个团队可以实现一个专注做好一件事的作业;而其他团队可以决定何时何地运行这项作业。
在这些领域,在 Unix 上表现良好的设计原则似乎也适用于 Hadoop,但 Unix 和 Hadoop 在某些方面也有所不同。例如,因为大多数 Unix 工具都假设输入输出是无类型文本文件,所以它们必须做大量的输入解析工作(本章开头的日志分析示例使用 {print $7}
来提取 URL)。在 Hadoop 上可以通过使用更结构化的文件格式消除一些低价值的语法转换:比如 Avro(参阅 Avro )和 Parquet(参阅 列存储 )经常使用,因为它们提供了基于模式的高效编码,并允许模式随时间推移而演进(见第 4 章)。
Hadoop 与分布式数据库的对比
正如我们所看到的,Hadoop 有点像 Unix 的分布式版本,其中 HDFS 是文件系统,而 MapReduce 是 Unix 进程的怪异实现(总是在 Map 阶段和 Reduce 阶段运行 sort
工具)。我们了解了如何在这些原语的基础上实现各种连接和分组操作。
当 MapReduce 论文发表时【1】,它从某种意义上来说 —— 并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前所谓的 大规模并行处理(MPP, massively parallel processing) 数据库中实现了【3,40】。比如 Gamma database machine,Teradata 和 Tandem NonStop SQL 就是这方面的先驱【52】。
最大的区别是,MPP 数据库专注于在一组机器上并行执行分析 SQL 查询,而 MapReduce 和分布式文件系统【19】的组合则更像是一个可以运行任意程序的通用操作系统。
存储多样性
数据库要求你根据特定的模型(例如关系或文档)来构造数据,而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码来编写。它们可能是数据库记录的集合,但同样可以是文本,图像,视频,传感器读数,稀疏矩阵,特征向量,基因组序列或任何其他类型的数据。
说白了,Hadoop 开放了将数据不加区分地转储到 HDFS 的可能性,允许后续再研究如何进一步处理【53】。相比之下,在将数据导入数据库专有存储格式之前,MPP 数据库通常需要对数据和查询模式进行仔细的前期建模。
在纯粹主义者看来,这种仔细的建模和导入似乎是可取的,因为这意味着数据库的用户有更高质量的数据来处理。然而实践经验表明,简单地使数据快速可用 —— 即使它很古怪,难以使用,使用原始格式 —— 也通常要比事先决定理想数据模型要更有价值【54】。
这个想法与数据仓库类似(参阅 数据仓库 ):将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨越以前相分离的数据集进行连接。 MPP 数据库所要求的谨慎模式设计拖慢了集中式数据收集速度;以原始形式收集数据,稍后再操心模式的设计,能使数据收集速度加快(有时被称为 数据湖(data lake) 或 企业数据中心(enterprise data hub) 【55】)。
不加区分的数据转储转移了解释数据的负担:数据集的生产者不再需要强制将其转化为标准格式,数据的解释成为消费者的问题( 读时模式 方法【56】;参阅 文档模型中的架构灵活性 )。如果生产者和消费者是不同优先级的不同团队,这可能是一种优势。甚至可能不存在一个理想的数据模型,对于不同目的有不同的合适视角。以原始形式简单地转储数据,可以允许多种这样的转换。这种方法被称为 寿司原则(sushi principle) : 原始数据更好 【57】。
因此,Hadoop 经常被用于实现 ETL 过程(参阅 数据仓库 ):事务处理系统中的数据以某种原始形式转储到分布式文件系统中,然后编写 MapReduce 作业来清理数据,将其转换为关系形式,并将其导入 MPP 数据仓库以进行分析。数据建模仍然在进行,但它在一个单独的步骤中进行,与数据收集相解耦。这种解耦是可行的,因为分布式文件系统支持以任何格式编码的数据。
处理模型多样性
MPP 数据库是单体的,紧密集成的软件,负责磁盘上的存储布局,查询计划,调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化,因此整个系统可以在其设计针对的查询类型上取得非常好的性能。而且,SQL 查询语言允许以优雅的语法表达查询,而无需编写代码,使业务分析师用来做商业分析的可视化工具(例如 Tableau)能够访问。
另一方面,并非所有类型的处理都可以合理地表达为 SQL 查询。例如,如果要构建机器学习和推荐系统,或者使用相关性排名模型的全文搜索索引,或者执行图像分析,则很可能需要更一般的数据处理模型。这些类型的处理通常是特别针对特定应用的(例如机器学习的特征工程,机器翻译的自然语言模型,欺诈预测的风险评估函数),因此它们不可避免地需要编写代码,而不仅仅是查询。
MapReduce 使工程师能够轻松地在大型数据集上运行自己的代码。如果你有 HDFS 和 MapReduce,那么你 可以 在它之上建立一个 SQL 查询执行引擎,事实上这正是 Hive 项目所做的【31】。但是,你也可以编写许多其他形式的批处理,这些批处理不必非要用 SQL 查询表示。
随后,人们发现 MapReduce 对于某些类型的处理而言局限性很大,表现很差,因此在 Hadoop 之上其他各种处理模型也被开发出来(我们将在 MapReduce 之后 中看到其中一些)。有两种处理模型,SQL 和 MapReduce,还不够,需要更多不同的模型!而且由于 Hadoop 平台的开放性,实施一整套方法是可行的,而这在单体 MPP 数据库的范畴内是不可能的【58】。
至关重要的是,这些不同的处理模型都可以在共享的单个机器集群上运行,所有这些机器都可以访问分布式文件系统上的相同文件。在 Hadoop 方法中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个群集内不同的工作负载。不需要移动数据,使得从数据中挖掘价值变得容易得多,也使采用新的处理模型容易的多。
Hadoop 生态系统包括随机访问的 OLTP 数据库,如 HBase(参阅 SSTables 和 LSM 树 )和 MPP 风格的分析型数据库,如 Impala 【41】。 HBase 与 Impala 都不使用 MapReduce,但都使用 HDFS 进行存储。它们是迥异的数据访问与处理方法,但是它们可以共存,并被集成到同一个系统中。
针对频繁故障设计
当比较 MapReduce 和 MPP 数据库时,两种不同的设计思路出现了:处理故障和使用内存与磁盘的方式。与在线系统相比,批处理对故障不太敏感,因为就算失败也不会立即影响到用户,而且它们总是能再次运行。
如果一个节点在执行查询时崩溃,大多数 MPP 数据库会中止整个查询,并让用户重新提交查询或自动重新运行它【3】。由于查询通常最多运行几秒钟或几分钟,所以这种错误处理的方法是可以接受的,因为重试的代价不是太大。 MPP 数据库还倾向于在内存中保留尽可能多的数据(例如,使用散列连接)以避免从磁盘读取的开销。
另一方面,MapReduce 可以容忍单个 Map 或 Reduce 任务的失败,而不会影响作业的整体,通过以单个任务的粒度重试工作。它也会非常急切地将数据写入磁盘,一方面是为了容错,另一部分是因为假设数据集太大而不能适应内存。
MapReduce 方式更适用于较大的作业:要处理如此之多的数据并运行很长时间的作业,以至于在此过程中很可能至少遇到一个任务故障。在这种情况下,由于单个任务失败而重新运行整个作业将是非常浪费的。即使以单个任务的粒度进行恢复引入了使得无故障处理更慢的开销,但如果任务失败率足够高,这仍然是一种合理的权衡。
但是这些假设有多么现实呢?在大多数集群中,机器故障确实会发生,但是它们不是很频繁 —— 可能少到绝大多数作业都不会经历机器故障。为了容错,真的值得带来这么大的额外开销吗?
要了解 MapReduce 节约使用内存和在任务的层次进行恢复的原因,了解最初设计 MapReduce 的环境是很有帮助的。 Google 有着混用的数据中心,在线生产服务和离线批处理作业在同样机器上运行。每个任务都有一个通过容器强制执行的资源配给(CPU 核心,RAM,磁盘空间等)。每个任务也具有优先级,如果优先级较高的任务需要更多的资源,则可以终止(抢占)同一台机器上较低优先级的任务以释放资源。优先级还决定了计算资源的定价:团队必须为他们使用的资源付费,而优先级更高的进程花费更多【59】。
这种架构允许非生产(低优先级)计算资源被 过量使用(overcommitted) ,因为系统知道必要时它可以回收资源。与分离生产和非生产任务的系统相比,过量使用资源可以更好地利用机器并提高效率。但由于 MapReduce 作业以低优先级运行,它们随时都有被抢占的风险,因为优先级较高的进程可能需要其资源。在高优先级进程拿走所需资源后,批量作业能有效地 捡面包屑 ,利用剩下的任何计算资源。
在谷歌,运行一个小时的 MapReduce 任务有大约有 5%的风险被终止,为了给更高优先级的进程挪地方。这一概率比硬件问题,机器重启或其他原因的概率高了一个数量级【59】。按照这种抢占率,如果一个作业有 100 个任务,每个任务运行 10 分钟,那么至少有一个任务在完成之前被终止的风险大于 50%。
这就是 MapReduce 被设计为容忍频繁意外任务终止的原因:不是因为硬件很不可靠,而是因为任意终止进程的自由有利于提高计算集群中的资源利用率。
在开源的集群调度器中,抢占的使用较少。 YARN 的 CapacityScheduler 支持抢占,以平衡不同队列的资源分配【58】,但在编写本文时,YARN,Mesos 或 Kubernetes 不支持通用优先级抢占【60】。在任务不经常被终止的环境中,MapReduce 的这一设计决策就没有多少意义了。在下一节中,我们将研究一些与 MapReduce 设计决策相异的替代方案。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论