MapReduce 之后
虽然 MapReduce 在二十世纪二十年代后期变得非常流行,并受到大量的炒作,但它只是分布式系统的许多可能的编程模型之一。对于不同的数据量,数据结构和处理类型,其他工具可能更适合表示计算。
不管如何,我们在这一章花了大把时间来讨论 MapReduce,因为它是一种有用的学习工具,它是分布式文件系统的一种相当简单明晰的抽象。在这里, 简单 意味着我们能理解它在做什么,而不是意味着使用它很简单。恰恰相反:使用原始的 MapReduce API 来实现复杂的处理工作实际上是非常困难和费力的 —— 例如,任意一种连接算法都需要你从头开始实现【37】。
针对直接使用 MapReduce 的困难,在 MapReduce 上有很多高级编程模型(Pig,Hive,Cascading,Crunch)被创造出来,作为建立在 MapReduce 之上的抽象。如果你了解 MapReduce 的原理,那么它们学起来相当简单。而且它们的高级结构能显著简化许多常见批处理任务的实现。
但是,MapReduce 执行模型本身也存在一些问题,这些问题并没有通过增加另一个抽象层次而解决,而对于某些类型的处理,它表现得非常差劲。一方面,MapReduce 非常稳健:你可以使用它在任务会频繁终止的多租户系统上处理几乎任意大量级的数据,并且仍然可以完成工作(虽然速度很慢)。另一方面,对于某些类型的处理而言,其他工具有时会快上几个数量级。
在本章的其余部分中,我们将介绍一些批处理方法。在 第 11 章 我们将转向流处理,它可以看作是加速批处理的另一种方法。
物化中间状态
如前所述,每个 MapReduce 作业都独立于其他任何作业。作业与世界其他地方的主要连接点是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入,则需要将第二个作业的输入目录配置为第一个作业输出目录,且外部工作流调度程序必须在第一个作业完成后再启动第二个。
如果第一个作业的输出是要在组织内广泛发布的数据集,则这种配置是合理的。在这种情况下,你需要通过名称引用它,并将其重用为多个不同作业的输入(包括由其他团队开发的作业)。将数据发布到分布式文件系统中众所周知的位置能够带来 松耦合 ,这样作业就不需要知道是谁在提供输入或谁在消费输出(参阅 逻辑与布线相分离 )。
但在很多情况下,你知道一个作业的输出只能用作另一个作业的输入,这些作业由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的 中间状态(intermediate state) :一种将数据从一个作业传递到下一个作业的方式。在一个用于构建推荐系统的,由 50 或 100 个 MapReduce 作业组成的复杂工作流中,存在着很多这样的中间状态【29】。
将这个中间状态写入文件的过程称为 物化(materialization) 。 (在 聚合:数据立方体和物化视图 中已经在物化视图的背景中遇到过这个术语。它意味着对某个操作的结果立即求值并写出来,而不是在请求时按需计算)
作为对照,本章开头的日志分析示例使用 Unix 管道将一个命令的输出与另一个命令的输入连接起来。管道并没有完全物化中间状态,而是只使用一个小的内存缓冲区,将输出增量地 流(stream) 向输入。
与 Unix 管道相比,MapReduce 完全物化中间状态的方法存在不足之处:
- MapReduce 作业只有在前驱作业(生成其输入)中的所有任务都完成时才能启动,而由 Unix 管道连接的进程会同时启动,输出一旦生成就会被消费。不同机器上的数据倾斜或负载不均意味着一个作业往往会有一些掉队的任务,比其他任务要慢得多才能完成。必须等待至前驱作业的所有任务完成,拖慢了整个工作流程的执行。
- Mapper 通常是多余的:它们仅仅是读取刚刚由 Reducer 写入的同样文件,为下一个阶段的分区和排序做准备。在许多情况下,Mapper 代码可能是前驱 Reducer 的一部分:如果 Reducer 和 Mapper 的输出有着相同的分区与排序方式,那么 Reducer 就可以直接串在一起,而不用与 Mapper 相互交织。
- 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这些临时数据这么搞就比较过分了。
数据流引擎
了解决 MapReduce 的这些问题,几种用于分布式批处理的新执行引擎被开发出来,其中最著名的是 Spark 【61,62】,Tez 【63,64】和 Flink 【65,66】。它们的设计方式有很多区别,但有一个共同点:把整个工作流作为单个作业来处理,而不是把它分解为独立的子作业。
由于它们将工作流显式建模为 数据从几个处理阶段穿过,所以这些系统被称为 数据流引擎(dataflow engines) 。像 MapReduce 一样,它们在一条线上通过反复调用用户定义的函数来一次处理一条记录,它们通过输入分区来并行化载荷,它们通过网络将一个函数的输出复制到另一个函数的输入。
与 MapReduce 不同,这些功能不需要严格扮演交织的 Map 与 Reduce 的角色,而是可以以更灵活的方式进行组合。我们称这些函数为 算子(operators) ,数据流引擎提供了几种不同的选项来将一个算子的输出连接到另一个算子的输入:
- 一种选项是对记录按键重新分区并排序,就像在 MapReduce 的混洗阶段一样(参阅 分布式执行 MapReduce )。这种功能可以用于实现排序合并连接和分组,就像在 MapReduce 中一样。
- 另一种可能是接受多个输入,并以相同的方式进行分区,但跳过排序。当记录的分区重要但顺序无关紧要时,这省去了分区散列连接的工作,因为构建散列表还是会把顺序随机打乱。
- 对于广播散列连接,可以将一个算子的输出,发送到连接算子的所有分区。
这种类型的处理引擎是基于像 Dryad 【67】和 Nephele 【68】这样的研究系统,与 MapReduce 模型相比,它有几个优点:
- 排序等昂贵的工作只需要在实际需要的地方执行,而不是默认地在每个 Map 和 Reduce 阶段之间出现。
- 没有不必要的 Map 任务,因为 Mapper 所做的工作通常可以合并到前面的 Reduce 算子中(因为 Mapper 不会更改数据集的分区)。
- 由于工作流中的所有连接和数据依赖都是显式声明的,因此调度程序能够总览全局,知道哪里需要哪些数据,因而能够利用局部性进行优化。例如,它可以尝试将消费某些数据的任务放在与生成这些数据的任务相同的机器上,从而数据可以通过共享内存缓冲区传输,而不必通过网络复制。
- 通常,算子间的中间状态足以保存在内存中或写入本地磁盘,这比写入 HDFS 需要更少的 I/O(必须将其复制到多台机器,并将每个副本写入磁盘)。 MapReduce 已经对 Mapper 的输出做了这种优化,但数据流引擎将这种思想推广至所有的中间状态。
- 算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成后再开始。
- 与 MapReduce(为每个任务启动一个新的 JVM)相比,现有 Java 虚拟机(JVM)进程可以重用来运行新算子,从而减少启动开销。
你可以使用数据流引擎执行与 MapReduce 工作流同样的计算,而且由于此处所述的优化,通常执行速度要明显快得多。既然算子是 Map 和 Reduce 的泛化,那么相同的处理代码就可以在任一执行引擎上运行:Pig,Hive 或 Cascading 中实现的工作流可以无需修改代码,可以通过修改配置,简单地从 MapReduce 切换到 Tez 或 Spark【64】。
Tez 是一个相当薄的库,它依赖于 YARN shuffle 服务来实现节点间数据的实际复制【58】,而 Spark 和 Flink 则是包含了独立网络通信层,调度器,及用户向 API 的大型框架。我们将简要讨论这些高级 API。
容错
完全物化中间状态至分布式文件系统的一个优点是,它具有持久性,这使得 MapReduce 中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。
Spark,Flink 和 Tez 避免将中间状态写入 HDFS,因此它们采取了不同的方法来容错:如果一台机器发生故障,并且该机器上的中间状态丢失,则它会从其他仍然可用的数据重新计算(在可行的情况下是先前的中间状态,要么就只能是原始输入数据,通常在 HDFS 上)。
为了实现这种重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用了哪些输入分区?应用了哪些算子? Spark 使用 弹性分布式数据集(RDD) 的抽象来跟踪数据的谱系【61】,而 Flink 对算子状态存档,允许恢复运行在执行过程中遇到错误的算子【66】。
在重新计算数据时,重要的是要知道计算是否是 确定性的 :也就是说,给定相同的输入数据,算子是否始终产生相同的输出?如果一些丢失的数据已经发送给下游算子,这个问题就很重要。如果算子重新启动,重新计算的数据与原有的丢失数据不一致,下游算子很难解决新旧数据之间的矛盾。对于不确定性算子来说,解决方案通常是杀死下游算子,然后再重跑新数据。
为了避免这种级联故障,最好让算子具有确定性。但需要注意的是,非确定性行为很容易悄悄溜进来:例如,许多编程语言在迭代哈希表的元素时不能对顺序作出保证,许多概率和统计算法显式依赖于使用随机数,以及用到系统时钟或外部数据源,这些都是都不确定性的行为。为了能可靠地从故障中恢复,需要消除这种不确定性因素,例如使用固定的种子生成伪随机数。
通过重算数据来从故障中恢复并不总是正确的答案:如果中间状态数据要比源数据小得多,或者如果计算量非常大,那么将中间数据物化为文件可能要比重新计算廉价的多。
关于物化的讨论
回到 Unix 的类比,我们看到,MapReduce 就像是将每个命令的输出写入临时文件,而数据流引擎看起来更像是 Unix 管道。尤其是 Flink 是基于管道执行的思想而建立的:也就是说,将算子的输出增量地传递给其他算子,不待输入完成便开始处理。
排序算子不可避免地需要消费全部的输入后才能生成任何输出,因为输入中最后一条输入记录可能具有最小的键,因此需要作为第一条记录输出。因此,任何需要排序的算子都需要至少暂时地累积状态。但是工作流的许多其他部分可以以流水线方式执行。
当作业完成时,它的输出需要持续到某个地方,以便用户可以找到并使用它—— 很可能它会再次写入分布式文件系统。因此,在使用数据流引擎时,HDFS 上的物化数据集通常仍是作业的输入和最终输出。和 MapReduce 一样,输入是不可变的,输出被完全替换。比起 MapReduce 的改进是,你不用再自己去将中间状态写入文件系统了。
图与迭代处理
在 图数据模型 中,我们讨论了使用图来建模数据,并使用图查询语言来遍历图中的边与点。 第 2 章 的讨论集中在 OLTP 风格的应用场景:快速执行查询来查找少量符合特定条件的顶点。
批处理上下文中的图也很有趣,其目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用(如推荐引擎)或排序系统中。例如,最着名的图形分析算法之一是 PageRank 【69】,它试图根据链接到某个网页的其他网页来估计该网页的流行度。它作为配方的一部分,用于确定网络搜索引擎呈现结果的顺序
像 Spark,Flink 和 Tez 这样的数据流引擎(参见 中间状态的物化 )通常将算子作为 有向无环图(DAG) 的一部分安排在作业中。这与图处理不一样:在数据流引擎中, 从一个算子到另一个算子的数据流 被构造成一个图,而数据本身通常由关系型元组构成。在图处理中,数据本身具有图的形式。又一个不幸的命名混乱!
许多图算法是通过一次遍历一条边来表示的,将一个顶点与近邻的顶点连接起来,以传播一些信息,并不断重复,直到满足一些条件为止 —— 例如,直到没有更多的边要跟进,或直到一些指标收敛。我们在 图 2-6 中看到一个例子,它通过重复跟进标明地点归属关系的边,生成了数据库中北美包含的所有地点列表(这种算法被称为 闭包传递(transitive closure) )。
可以在分布式文件系统中存储图(包含顶点和边的列表的文件),但是这种 重复至完成 的想法不能用普通的 MapReduce 来表示,因为它只扫过一趟数据。这种算法因此经常以 迭代 的风格实现:
- 外部调度程序运行批处理来计算算法的一个步骤。
- 当批处理过程完成时,调度器检查它是否完成(基于完成条件 —— 例如,没有更多的边要跟进,或者与上次迭代相比的变化低于某个阈值)。
- 如果尚未完成,则调度程序返回到步骤 1 并运行另一轮批处理。
这种方法是有效的,但是用 MapReduce 实现它往往非常低效,因为 MapReduce 没有考虑算法的迭代性质:它总是读取整个输入数据集并产生一个全新的输出数据集,即使与上次迭代相比,改变的仅仅是图中的一小部分。
Pregel 处理模型
针对图批处理的优化 —— 批量同步并行(BSP) 计算模型【70】已经开始流行起来。其中,Apache Giraph 【37】,Spark 的 GraphX API 和 Flink 的 Gelly API 【71】实现了它。它也被称为 Pregel 模型,因为 Google 的 Pregel 论文推广了这种处理图的方法【72】。
回想一下在 MapReduce 中,Mapper 在概念上向 Reducer 的特定调用 发送消息 ,因为框架将所有具有相同键的 Mapper 输出集中在一起。 Pregel 背后有一个类似的想法:一个顶点可以向另一个顶点 发送消息 ,通常这些消息是沿着图的边发送的。
在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 —— 就像调用 Reducer 一样。与 MapReduce 的不同之处在于,在 Pregel 模型中,顶点在一次迭代到下一次迭代的过程中会记住它的状态,所以这个函数只需要处理新的传入消息。如果图的某个部分没有被发送消息,那里就不需要做任何工作。
这与 Actor 模型有些相似(参阅 分布式的 Actor 框架 ),除了顶点状态和顶点之间的消息具有容错性和耐久性,且通信以固定的方式进行:在每次迭代中,框架递送上次迭代中发送的所有消息。Actor 通常没有这样的时间保证。
容错
顶点只能通过消息传递进行通信(而不是直接相互查询)的事实有助于提高 Pregel 作业的性能,因为消息可以成批处理,且等待通信的次数也减少了。唯一的等待是在迭代之间:由于 Pregel 模型保证所有在一轮迭代中发送的消息都在下轮迭代中送达,所以在下一轮迭代开始前,先前的迭代必须完全完成,而所有的消息必须在网络上完成复制。
即使底层网络可能丢失,重复或任意延迟消息(参阅 不可靠的网络 ),Pregel 的实现能保证在后续迭代中消息在其目标顶点恰好处理一次。像 MapReduce 一样,框架能从故障中透明地恢复,以简化在 Pregel 上实现算法的编程模型。
这种容错是通过在迭代结束时,定期存档所有顶点的状态来实现的,即将其全部状态写入持久化存储。如果某个节点发生故障并且其内存中的状态丢失,则最简单的解决方法是将整个图计算回滚到上一个存档点,然后重启计算。如果算法是确定性的,且消息记录在日志中,那么也可以选择性地只恢复丢失的分区(就像之前讨论过的数据流引擎)【72】。
并行执行
顶点不需要知道它在哪台物理机器上执行;当它向其他顶点发送消息时,它只是简单地将消息发往某个顶点 ID。图的分区取决于框架 —— 即,确定哪个顶点运行在哪台机器上,以及如何通过网络路由消息,以便它们到达正确的地方。
由于编程模型一次仅处理一个顶点(有时称为 像顶点一样思考 ),所以框架可以以任意方式对图分区。理想情况下如果顶点需要进行大量的通信,那么它们最好能被分区到同一台机器上。然而找到这样一种优化的分区方法是很困难的 —— 在实践中,图经常按照任意分配的顶点 ID 分区,而不会尝试将相关的顶点分组在一起。
因此,图算法通常会有很多跨机器通信的额外开销,而中间状态(节点之间发送的消息)往往比原始图大。通过网络发送消息的开销会显着拖慢分布式图算法的速度。
出于这个原因,如果你的图可以放入一台计算机的内存中,那么单机(甚至可能是单线程)算法很可能会超越分布式批处理【73,74】。图比内存大也没关系,只要能放入单台计算机的磁盘,使用 GraphChi 等框架进行单机处理是就一个可行的选择【75】。如果图太大,不适合单机处理,那么像 Pregel 这样的分布式方法是不可避免的。高效的并行图算法是一个进行中的研究领域【76】。
高级 API 和语言
自 MapReduce 开始流行的这几年以来,分布式批处理的执行引擎已经很成熟了。到目前为止,基础设施已经足够强大,能够存储和处理超过 10,000 台机器群集上的数 PB 的数据。由于在这种规模下物理执行批处理的问题已经被认为或多或少解决了,所以关注点已经转向其他领域:改进编程模型,提高处理效率,扩大这些技术可以解决的问题集。
如前所述,Hive,Pig,Cascading 和 Crunch 等高级语言和 API 变得越来越流行,因为手写 MapReduce 作业实在是个苦力活。随着 Tez 的出现,这些高级语言还有一个额外好处,可以迁移到新的数据流执行引擎,而无需重写作业代码。 Spark 和 Flink 也有它们自己的高级数据流 API,通常是从 FlumeJava 中获取的灵感【34】。
这些数据流 API 通常使用关系型构建块来表达一个计算:按某个字段连接数据集;按键对元组做分组;按某些条件过滤;并通过计数求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。
除了少写代码的明显优势之外,这些高级接口还支持交互式用法,在这种交互式使用中,你可以在 Shell 中增量式编写分析代码,频繁运行来观察它做了什么。这种开发风格在探索数据集和试验处理方法时非常有用。这也让人联想到 Unix 哲学,我们在 Unix 哲学 中讨论过这个问题。
此外,这些高级接口不仅提高了人类的工作效率,也提高了机器层面的作业执行效率。
向声明式查询语言的转变
与硬写执行连接的代码相比,指定连接关系算子的优点是,框架可以分析连接输入的属性,并自动决定哪种上述连接算法最适合当前任务。 Hive,Spark 和 Flink 都有基于代价的查询优化器可以做到这一点,甚至可以改变连接顺序,最小化中间状态的数量【66,77,78,79】。
连接算法的选择可以对批处理作业的性能产生巨大影响,而无需理解和记住本章中讨论的各种连接算法。如果连接是以 声明式(declarative) 的方式指定的,那这就这是可行的:应用只是简单地说明哪些连接是必需的,查询优化器决定如何最好地执行连接。我们以前在 数据查询语言 中见过这个想法。
但 MapReduce 及其后继者数据流在其他方面,与 SQL 的完全声明式查询模型有很大区别。 MapReduce 是围绕着回调函数的概念建立的:对于每条记录或者一组记录,调用一个用户定义的函数(Mapper 或 Reducer),并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以基于大量已有库的生态系统创作:解析,自然语言分析,图像分析以及运行数值算法或统计算法等。
自由运行任意代码,长期以来都是传统 MapReduce 批处理系统与 MPP 数据库的区别所在(参见 比较 Hadoop 和分布式数据库 一节)。虽然数据库具有编写用户定义函数的功能,但是它们通常使用起来很麻烦,而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统兼容不佳(例如 Java 的 Maven, Javascript 的 npm,以及 Ruby 的 gems)。
然而数据流引擎已经发现,支持除连接之外的更多 声明式特性 还有其他的优势。例如,如果一个回调函数只包含一个简单的过滤条件,或者只是从一条记录中选择了一些字段,那么在为每条记录调用函数时会有相当大的额外 CPU 开销。如果以声明方式表示这些简单的过滤和映射操作,那么查询优化器可以利用面向列的存储布局(参阅 列存储 ),只从磁盘读取所需的列。 Hive,Spark DataFrames 和 Impala 还使用了向量化执行(参阅 内存带宽和向量处理 ):在对 CPU 缓存友好的内部循环中迭代数据,避免函数调用。Spark 生成 JVM 字节码【79】,Impala 使用 LLVM 为这些内部循环生成本机代码【41】。
通过在高级 API 中引入声明式的部分,并使查询优化器可以在执行期间利用这些来做优化,批处理框架看起来越来越像 MPP 数据库了(并且能实现可与之媲美的性能)。同时,通过拥有运行任意代码和以任意格式读取数据的可扩展性,它们保持了灵活性的优势。
专业化的不同领域
尽管能够运行任意代码的可扩展性是很有用的,但是也有很多常见的例子,不断重复着标准的处理模式。因而这些模式值得拥有自己的可重用通用构建模块实现,传统上,MPP 数据库满足了商业智能分析和业务报表的需求,但这只是许多使用批处理的领域之一。
另一个越来越重要的领域是统计和数值算法,它们是机器学习应用所需要的(例如分类器和推荐系统)。可重用的实现正在出现:例如,Mahout 在 MapReduce,Spark 和 Flink 之上实现了用于机器学习的各种算法,而 MADlib 在关系型 MPP 数据库(Apache HAWQ)中实现了类似的功能【54】。
空间算法也是有用的,例如 最近邻搜索(k-nearest neghbors, kNN) 【80】,它在一些多维空间中搜索与给定项最近的项目 —— 这是一种相似性搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串【81】。
批处理引擎正被用于分布式执行日益广泛的各领域算法。随着批处理系统获得各种内置功能以及高级声明式算子,且随着 MPP 数据库变得更加灵活和易于编程,两者开始看起来相似了:最终,它们都只是存储和处理数据的系统。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论