弹性分布式数据集:内存计算集群的容错抽象
概要
迭代算法:梯度下降,就是迭代算法。无法获取最优解,就去获取近似解。
此外,如同Dijkstra 和bellman-ford算法也是迭代算法。
我们提出了弹性分布式数据集(RDD),一个分布式的内存抽象,它运行程序在大规模的集群上执行内存计算,同时保留类似Mapreduce的数据流模型容错。RDD由俩中类型的应用驱动,当前数据流系统处理效率低下:迭代算法,它在图计算和机器学习中以及交互式数据挖掘工具中很常见。在这些例子中,保持数据在内存中可以将性能提高一个数量级。为了有效的容错,RDD提供了高度受限的共享内存形式:它们是只读数据集,它们只能通过在其他RDD上批量操作来构建。然而我们展示了RDD由足够的表现力可以补货广泛的计算类,包括Mapreduce和特定的程序模型可以用于如Pregel这样的迭代作业.我们的RDD在迭代工作上能够达到hadoop20被的性能,它可以用来,交互式的检索1TB的数据集并且只有5-7分钟的延迟。
1 介绍
高级的客户端程序模型,类似Mapreduce和Dryad已经广泛的被用于工业中和科学中不断增长的数据量。这个系统通过自动提供自动感知调度我,容错和负载均衡简化了分布式程序,允许大范围的用户能在商用集群上分析大数据集。
Spark 相对于 MapReduce 的最重要改进之一是其 基于内存的计算模型 和更灵活的编程模型。
无环数据流模型(acyclic data flow model):是以有向无环图来描述计算过程的。数据在操作符之间按有向边的数据流动,而图的无环性保证了数据流不会回到之前的节点。
目前大多数集群计算系统是基于无环数据流模型,其中记录被加载到存储中(即 分布式文件系统),通过一个确定性的操作组成DAG并且写回稳态存储中。了解数据流图允许运行时自动跳读工作并且从失败中恢复。
无环数据流是一个强大的抽象,但是有些应用不能只使用这个结构来进行表述。这些应用在无环模型下表现不佳:只在多次并行中重用一组工作数据的情况。这类算法包括迭代算法,广泛的被用于在机器学习和图计算中,它们在每一步上对数据应用类似的函数,并且与数据挖掘工具交互,其中用户查询数据的子集。因为数据集的框架没有明确地提供对工作集的支持,这些应用程序必须将数据输出到磁盘,并且使用当前系统的每次查询时重新加载数据,导致了巨大的开销。
位置感知调度是一种优化策略,最小化数据移动,提高任务执行效率。它通过优先将任务调度到靠近数据存储位置的计算节点上,充分利用局部性。
我们提出了分布式内存抽象,叫做弹性分布数据集(resilient distributed dataset)它支持带有工作集的应用程序,同时保持了吸引人的数据流模型特征:自动容错,位置感知(locality-aware)调度,可扩展性。RDD允许用户在不同的查询中在内存中直接缓存工作集,大大加块了未来的重用速度。
RDD提供了共享内存的高度结构化数据:它是只读的,只能通过其他RDD上的确定性转换(例如,map、join和group-by)来创建。然而这个限制允许低开销容错。与分布式共享系统内存相比,它需要检查点和回滚,RDD则通过血缘(lineage)重建丢失的分区:RDD有足够的信息来了解它是如何从其他RDD中派生出来的,从而只重建缺失的分区,而无需检查点任何数据。虽然RDD不是通用的共享内存抽象,它代表了表达性,可扩张性和可靠性的甜蜜点,我们已经发现它们非常适合各种数据并行应用程序。
我们的工作不是第一个注意到非循环束流的局限性工作。例如,Google的Pergel是一个专门的用于迭代图算法的程序模型,而Twister和HaLoop提供了一个迭代Mapreduce模型,然而,这些系统对于特定应用的类型提供了受限的模式。作为对比,RDD对于有工作集的应用是更加普通的抽象。它允许用户清楚的命名和指定中间结果,他只他们的分区,并且在它们选择的操作中使用它们(而不是给定Mapreduce的运行时步骤来循环)。我们展示了RDD可以用来表达Pregel和迭代的Mapreduce,以及这些程序都不能很好的满足的应用程序,比如交互式数据挖掘工具(其中一个用户加载数据集到内存中斌进行临时的查询)。
我们已经在系统中实现了RDD叫做spark,它已经在我们的机构中被用于开发多种并行应用。Spark提供了语言几成变成接口类似Scala程序语言的DryadLINQ,让他变得容易被用户书写并行任务。此外,Spark可以备用来交互式地从scala解释器查询大数据集。我们相信Spark是第一个系统,它允许在集群上有效的通用的程序员语言与分析大数据进行交互。
在 Spark 中,lineage(血缘)指的是一个 RDD(弹性分布式数据集)如何从其他 RDD 派生出来的计算过程的历史记录。简单来说,lineage 描述了数据变换的过程,即从初始数据源(如 HDFS 或其他存储系统)经过一系列操作(如 map、filter、join 等)产生当前 RDD 的关系。
我们通过微基准测试盒用户应用衡量来评估RDD。我们展示了在迭代应用中,Spark的性能比hadoop的高了20倍,提高了40倍的数据分析报表性能,可以用来扫描1TB的数据集同时延迟为5-7秒。此外,我们已经在Spark上实现了Pregel和Haloop程序模型,包括它们采用的放置优化,作为较小的库(分别为100和200行的Scala)。最终我们使用了rdd的确定性特性来构建rddbg,一个Spark的debug工具,允许用户使用 RDD 的 lineage(血缘)重建在作业中创建的任何 RDD,并在常规调试器中重新运行任务。
2 弹性分布式数据集(RDD)
2.1 目标和总览
我们的目标是提供一个抽象,它支持带有工作集的应用程序(即在多个并行操作中重用中间结果的应用)同时保留Mapreduce和相关模型的特性:自动容错本地化调度和可扩展性。RDD应该数据流模型那样易于编程,但是能够有效的表达工作集的计算过程。
RDD血缘也可以叫RDD依赖关系图。当我们计算一个RDD时,会以来多个父RDD的数据,而这些父RDD又会依赖于它自身的父RDD,这样RDD之间的依赖就形成了有向无环图,这些关系被记录在一个图中,这是就是RDD血缘。
在我们希望的特性中,最难有效支持的是容错。通常,有俩个操作可以让分布式数据集容错:检查点数据或者记录它所做的更新。在我们的目标环境中(大尺度的数据分析),检查点数据是昂贵的:它需要在数据中心的网络上跨越机器进行大规模的复制,那种贷款通常比机器内部的内存带宽低的多,并且还会额外的增加内存空间(在内存中备份数据将会减少可以缓存的容量,同时记录它到词频中将会拖慢应用程序)。因此,我们选择了记录更新。然而记录更新如果很多,同样是昂贵的。因此,RDD只支持粗粒度的转换,其中我们可以记录单个操作以用于多个记录。然后我们使用构建RDD(它的血缘)并且使用它回复丢失的分区来回忆起转换序列。
虽然支持粗粒度的转换约束了程序模型,我们已经发现RDD适合广大的应用。在现实中,RDD非常适合数据并行批处理分析应用程序,包括数据挖掘,机器学习和图算法,因为这些程序自然会多许多记录做相同的操作。RDD不适合需要异步更新共享状态的程序,比如网络爬虫,然而,我们的目标是提供一个有效的。然而我们的目标是为了大量的分析型应用程序和提供有效的程序模型,并且把其他应用留给专门的系统。
2.2 RDD抽象
理论上,RDD是一个只读的,记录的分区集合。RDD只能通过以下之一来创建 1 稳态存储中的数据集 2 其他已经存在的RDD。我们把这个叫做操作转换,以便他们与程序可能在RDD上应用的其他操作区分出来。转换(transformation)操作包括了map,filter,group By和join。
RDD不需要在所有时间都被具现化。相反,RDD由足够的信息来了解它是如何从其他数据集派生出来的(即 血缘)以便从稳态的存储中数据计算它的分区。
2.3 程序模型
在Spark中,RDD是通过对象表示的,使用这些对象的方法调用转换。
在定义一个或多个RDD之后,程序员可以在操作中使用它们,这些操作返回值给应用或者输出数据到存储系统。这些动作包括计数(它返回RDD中元素的数量),收集(它返回元素本身),和save(它输出RDD到存储系统中)在spark中,RDD仅在第一次操作中使用时才计算(即 它是懒评估的),允许运行时在构建RDD时对几个转换进行流水线化。
一个RDD如果需要被多次使用,每次计算的话会耗费大量的时间。所以就可以缓存起来。
程序也可以控制RDD的俩个其他方面:缓存和分区。用户可以要求RDD进行缓存,在这种情况下,运行时可以存RDD分区,它已经被计算出来以加速未来的重用。缓存RDD通常存储在内存中,但是如果内存不够可能会溢出到磁盘中。
最后RDD允许用户选择分区顺序基于key或者相关的每一条记录。我们当前支持hash和范围分区。例如,一个应用可能要求俩个RDD用同样的方式进行hash分区(将具有相同秘钥的记录放在一台机器上)以加速它们之间的连接。在Pergel和Haloop中,跨迭代的保持一致的分区防止是主要优化点之一,所以我们让用户进行优化。
2.4 例子:控制台日志挖掘
我们通过举一个例子来描述RDD。假设大型网站出现了错误,并且一个操作想要查询TB级别的日志位于HDFS中以找到原因。使用spark,我们对于RDD的实现,操作员可以将日志中的错误加载到跨一组节点的RAM中,并以交互方式查询它们。它首在spark解释中先输入以下的scala代码:
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.cache()
第一行定义了一个由HDFS文件支持的RDD(作为文件行的集合),其中第二行从中派生一个过滤后的RDD。第三行请求对于errors进行缓存。注意,filter的参数是闭包的scala语法。
此时,还没有在集群上执行任何工作。然而,用户现在可以在动作中使用RDD,即统计信息的数量。
errors.count()
用户可以在RDD上执行进一步的变换并且使用哪个结果,
// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split(’\t’)(3))
.collect()
在第一个相关的errors运行之后,spark会缓存异常的部分到内存中,大大加速了后续的计算啊。注意,RDD,lines永远不会被缓存。这是令人满意的,因为错误消息可能只是数据的一小部分(足够小可以塞入内存中)。
图1:我们例子中的三次查询的血缘图。盒子请求RDD并且箭头表示转换。
最后为了描述我们的模型如何实现容错,我们在图1的重展示了RDD的血缘图在我们的三个查询中。在这次查询中我们伴随着errors开始,在行上筛选的结果,并运行手机之前应用。spark将会流水线话之后的俩个变化,并且发送一个任务集合到保存errors的分区以计算它们。如果错误的分区丢失了,spark通过使用只过滤对应分区的数据行来重构它们。
2.5 RDD vs 分布式共享内存
DSM 提供了一种抽象,让程序员可以在不直接处理底层网络通信的情况下,使用类似于共享内存的方式来编写分布式应用程序。通过 DSM,程序员可以专注于逻辑开发,而不需要过多关注节点之间的数据同步和通信细节。
方向 | RDD | 分布式共享内存 |
---|---|---|
读 | 批或者细粒度 | 细粒度 |
写 | 批量转换 | 细粒度 |
一致性 | 琐碎的(不可变) | 直到应用、运行时 |
故障恢复 | 使用血缘并且细粒度低开销 | 需要检查点和程序回滚 |
拖尾缓和 | 可用于备份任务 | 困难 |
任务放置 | 基于本地数据自动进行 | 到应用程序(运行时的目标是透明度) |
如果没有足够内存的行为 | 类似于现有的数据系统 | 表现不佳(swapping) |
表1:RDD对比分布式共享内存
为了进一步理解RDD作为分布式内存抽象的能力,我们将它与分布式共享内存进行比较在表1中。在DSM系统中,应用读取和写入到全局地址空间中任意的位置。(注意在这个定义下,我们不仅包括传统的共享内存系统,还包括应用程序系统可能通过分布式哈希表或者文件系统共享数据的系如Piccolo)。DMS是非常一般化的抽象,但是这个一般化使得在商品集群上以高效和容错的方式变得更加困难。
RDD和DSM的主要不同是RDD可以只创建(“写入”)通过一批比划,DSM允许读取和写入每个内存位置。浙江限制RDD应用的批量写入性能,但是允许更有效的容错。现实中,rdd不需要承担检查点操作所带来的开销,因为它们可以通过血缘来恢复,只有丢失了RDD分区才需要在失败是重新计算,它们可以并行的在不同节点上进行重算,不需要回滚整个程序。
一个有趣的观察是,RDD也容忍系统慢速节点(掉队者)通过运行任务的备份拷贝,类似Mapreduce。备份任务在DMS中很难进行实现,因为俩个副本将对相同的内存地址进行读写。
RDD模型还提供了相当于DSM的另外俩个好处。第一,在RDD上的一批操作,运行时任务可以基于数据的局部性调度任务来提高性能。其次,当内存不足够存储它们时,缓存的RDD会优雅的降级,只要它们只用于基于扫描的操作。不适合RAM的分区可以存储在磁盘上,将会提供当前数据流系统相似的性能。
最后一个比较点是读取的粒度。我们的许多在RDD上的操作(统计和手机)执行批量读取它会扫描整个数据集,这也让我们可以在数据集附近安排它们。然而,RDD可以被用于在数据附近安排它们,通过哈希或者范围分区的RDD上进行查找。
3 Spark程序接口
Spark在Scala中提供了RDD抽象,通过语言进行交互。Sclaa是一个静态类型函数和基于JAVA VM的面向对象语言。我们选择Scala是因为它的简洁(它对于交互使用特别有用)和效率(因为静态的类型)。然而,RDD的抽象不需要函数语言;通过类型代表用户函数就像Hadoop做的一样,也可以提供其他语言的RDD。
图2:spark运行时,用户的驱动程序组织多个工作节点,它们读取分布式文件系统的数据块,并可以在内存中缓存计算的RDD分区。
为了使用Spark,开发这编写驱动程序,它连接集群到运行的工作节点上,如图2所示。驱动定义了一个或多个RDD并且在它们上面执行操作。worker是重启存在的进程,它可以将RDD分区缓存到内存中,如同Java的对象。
在2.4章节的例子中可以看出,用户提供了参数给RDD操作就像通过传递闭包(函数字面量)进行映射。scala将每个闭包对象表示为一个java对象,并且这些对象可以被序列化并且加载到另一个节点上,以在网络中传递闭包。Scala也保存了所有绑定的变量表中作为java对象的域。例如,可以编写如下代码var x = 5; rdd.map(_ + x)
以增加5到每个RDD元素上。总的来说Spark的语言集成类似于DryadLINQ。
RDD本身是一个被元素类型参数化的对象。例如,RDD[int]是一个整数类型的RDD。然而,我们大部分的例子省略了,因为Scala支持类型推断。
虽然我们暴露的RDD方法在Scala中,概念上很简单,我们不得不通过反射来解决Scala闭包对象的相关问题。我们还需要做更多的工作以确保Spark可以从Spark解释其中使用。尽管如此,我们并不需要修改Scala编译器。
3.1 spark中的RDD操作
表2中列出主要的RDD变换和在Spark中可用的动作。我们给出每个操作的签名,在方括号中展示类型签名。回想一下,当操作启动计算以返回值到程序或者或者将数据写入到外部存取器时,变化是懒加载的它被定义在新的RDD中。
记住一些操作,比如加入,只在键值对的RDD上可用。同样的,我们的函数名字与Scala和其他的函数式语言中的其他API相匹配;例如,map是一对一的映射,而flatMap将每个输入映射到多个输出(类似Mapreduce中的map)。
除了这些操作,用户可以要求RDD进行缓存。此外用户可以获得RDD分区顺序,它由Partitioner类表示,并且根据它划分另一个RDD。类似groupByKey,reduceByKey和sort操作自动的手机哈希或者范围分区的结果。
4 应用例子
4.1 机器学习
逻辑回归通常用来解决二分类问题,将输出的函数映射到0到1的范围区间。
许多机器学习算法在本质上是迭代的,因为它们运行迭代优化程序,比如梯度下降以优化目标函数。如果工作集适用于整个集群的RAM,这个算法可以大大的加块速度。此外这个算法通常使用批量操作类似map和求和,让他们可以简单的被RDD表达。
一个例子,以下的程序实现了逻辑回归,一种常用的分类算法,它查询点的几何能够被朝平面w所分割(如垃圾邮件文件)。这个算法用了梯度下降:它在随机点开始w,在每一次迭代上将w的函数除以数据,让w向一个更号的方向移动。
代码演示的是一个分布式梯度下降。
val points = spark.textFile(...).map(parsePoint).cache()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}
我们通过定义缓存RDD叫做points作为map在文本文件上转换的结果,该转换将每一行文本对象解析为point对象。我们然后通过对w的函数求和分别运行map和reduce任务。在7.1章中我们展示了以这种方式缓存点可以大大提高磁盘加载数据文件并在每个步骤上解析它的速度。用户使用Spark实现迭代的机器学习算法的其他示例包括k-means,它像逻辑回归一样每次迭代运行一个map、reduce对;期望最大化(EM)算法,它在两个不同的 map/reduce 步骤之间交替进行;并且交替最小二乘矩阵分解,一种协同过滤算法。Chu等人已经证明迭代MapReduce也可以用于实现其他常见的学习算法。
4.2 使用mapreduce
mapreduce模型可以很容易的被使用RDD所表示。给定一个元素类型为T的输入数据集,和函数$myMap:T \Rightarrow List[(K_i,V_i)]$以及$myReduce:(K_i,List[V_i])\Rightarrow List[R]$ 可以写为
这里的data是一个分布式数据集,myMap是一个用户定义的map函数,将每一项转化为零个或多个新的数据项。
groupByKey将flatMap的输出按键(k)分组。
map函数对分组后的数据进行处理,其中myReduce函数是一个用户定义的规约函数。
第一行map,第二行shuffle,第三行reduce
data.flatMap(myMap)
.groupByKey()
.map((k, vs) => myReduce(k, vs))
如果作业有组合器可以写为
data.flatMap(myMap)
.reduceByKey(myCombiner)
.map((k, v) => myReduce(k, v))
reduceByKey操作在映射器节点上执行部分聚合,就像MapReduce的组合器一样。
mapreduce中就有组合器的概念先将数据聚合在尽显传输,求和的时候先把数据求和再传输能节省大量带宽。
4.3 Pregel 使用RDD
Pregel是一个图应用的程序模型,它基于Bulk Synchronous Parallel范式。程序作为一些列协调迭代序列,叫做超级步。在每个超级步上,每个途中的顶点运行用户计算的逻辑,它更新相关的顶点状态,改变图拓扑,并且发送消息到其他的顶点以使用超级步。这个模型可以被表达为许多的图算法,包括最短路径法,二分图匹配算法,和网页排名算法。程序关联当前的pageRankr作为每个顶点相关联的状态。在每个超级步中,每个顶点对于相应的临边的共线都是r/n。其中n是它邻居的个数。在下一个超级步开始时,每个顶点更新它的排名$\alpha/N+(1-\alpha)\sum c_i$,其中接收到的贡献N是顶点的总数。
pregel分区输入图穿过工作节点并存储在内存中。在每个超级步中,工作者通过类似mapreduce的shuffle交换消息。
Pregel的沟通模式可以被使用RDD表达,我们展示了图3和如下的代码。关键思想是将每个超步的顶点状态和发送的消息存储为rdd,并按顶点id对它们进行分组,以执行shuffle通信模式。然后,我们对每个顶点ID的状态和消息应用用户函数,生成一个新的RDD (VertexID, (NewState, OutgoingMessages))对,并且map它以分理出下一个迭代状态和信息。
val vertices = // RDD of (ID, State) pairs
val messages = // RDD of (ID, Message) pairs
val grouped = vertices.cogroup(messages)
val newData = grouped.mapValues {
(vert, msgs) => userFunc(vert, msgs)
// returns (newState, outgoingMsgs)
}.cache()
val newVerts = newData.mapValues((v,ms) => v)
val newMsgs = newData.flatMap((id,(v,ms)) => ms)
该实现的一个重要方面是分组的、newData的和newvert的RDD将以与输入RDD、顶点相同的方式进行分区。作为结果,定点状态不会离开启动它们的机器,有些类似Pregel,减少了工作的沟通成本。这个分区是自动发生的,因为cogroup和mapValues保留了它们输入的RDD分区。
全部的Pregel程序模型包括几个设施包括组合器。我们在附录A中讨论如何实现。在这一章节的剩余部分我们讨论Pregel中的容错,并且展示RDD如何提供相同的容错特性并且也可以减少检查点的数据量。
我们在Spark上用100行Scala代码,已经实现了类似Pregel的API,在7.2章使用pageRank来评估它。
4.3.1 pregel的容错
Pregel当前的定期检查点顶点状态和信息以进行容错。然而,作者还描述了通过记录其他节点上发送的消息单独重建分区受限的恢复工作。RDD可以同时支持者俩个方法。
仅通过4.3章中的实现,Spark可以总是血缘重建顶点和消息RDD,但是由于谱系链长,恢复可能是昂贵的。因为每个迭代器的RDD依赖于前一个,丢失一个节点可能导致丢失一些分区的整个状态的迭代版本,需要重新执行级联来重建每个丢失的分区。为了避免这一点,用户可以在定点和消息RDD上分别调用save函数来进行检查点它们到持久化存储中。一旦它完成了,Spark会自动的只重算失败部分丢失的分区(而不是回滚整个程序)。
最后,我们注意到rdd还可以通过表达更有效的检查点方案来减少所需的检查点数量。在许多Pregel任务重,定点状态同时包括可变的和不可变的组成。例如,在PageRank中,一个顶点的邻居列表从来不会改变,但它的排名会改变。在这样的例子中,我们可以将不可变的数据放在单独的RDD中,使用短血缘链,并且只检查点可变的状态。我们在图4中描述了这个方法。
在PageRank中,不可变的状态(邻居列表)远大于可变状态(浮点值),因此可以大幅度减少开销。
4.4 HaLoop使用RDD
Haloop是Hadoop框架的扩展版本,被设计为提升迭代mapreduce程序的性能。应用使用HaLoop程序模型进行表达,用早起迭代的reduce阶段的输出作为下一个迭代map阶段的输入。它循环感知任务调度器以确认在每个迭代中,处理相同数据分区的连续的map和reduce任务被安排在同一台物理机上。确保迭代之间的取不下减少了机器之间的传输,并且使得数据可以被缓存在本地磁盘在晚些的迭代重用。
我们已经在Spark上实现了类似HaLoop的API使用RDD来表达HaLoop优化器。通过partiitionBy确保迭代之间的一致性分区,每个节点的输入和输出被缓存用于晚些阶段。这个库使用了200行Scala代码写完。
4.5 不适合RDD的应用
如同在章节2.1中的讨论,RDD非常适合那行执行批处理的应用,它们对所有数据集的元素做相同的事情。在这个例子中,RDD可以有效的记住每一个在血缘图上的步骤例的转化函数,并且可以恢复丢失的无需记录大量的数据。RDD不适合异步进行细粒度更新的应用,比如网站应用的存储系统或者增量的Web爬虫和索引。对于这些应用,使用执行传统更新日志和数据检查点的系统效率更高,比如数据库RAM Cloud,Percolator,Piccolo。我们的目标是提供有效的面向批分析应用的程序模型,将这些异步应用程序留给专门的系统。
5 RDD表示与任务调度
spark中的RDD已经有些过时了被数据帧所取代了,DataFrame是显示列的RDD
DataFrame支持谓词下推等技术。假设你有一个查询,需要从数据源(如 Parquet 文件、Hive 表、JDBC 数据库等)中筛选特定的记录。谓词下推会将查询中的过滤条件(即谓词,如
WHERE age > 30
)直接传递到数据源,让数据源只返回满足条件的数据,而不是将所有数据加载到 Spark 中再进行过滤。DataFrame 是基于RDD的,同时增强了RDD,每个操作最终最终会被转化为RDD操作。
在spark中我们希望在RDD上支持支持广泛的可组合的转换,而不需要定义每个转换的调度器,并且我们希望捕捉这些所有变换的血缘信息。我们给RDD设计了一个小的公共的交互接口让他更容易达到这个目标。
简单来说,每个RDD都有一组分区,它是数据集的原子的部分;一组对父RDD的依赖,它捕获了血缘;基于父节点计算RDD的函数;以及关于分区调度和数据防止的元数据。例如,表示HDFS文件的RDD为文件的每个块都有一个分区,并且知道每个块在HDFS的哪个节点上。同时次RDD上的映射结果具有相同的分区,当计算父数据元素时,应用map函数到父数据。我们在Table3概要了RDD接口。
操作 | 意义 |
---|---|
partitions() | 返回一个分区对象的列表 |
preferredLocations(p) | 列出由于数据局部性而可以更快访问分区p的节点 |
dependencies() | 返回相关的列表 |
iterator(p, parentIters) | 给定父分区的迭代器,计算分区p的元素 |
partitioner() | 返回指定RDD是否为散列/范围分区的元数据 |
表3:Spark中的RDD交互接口
图5:展示了其他的例子。
在设计这个界面时中最有趣的交互问题是如何表示RDD之间的依赖。我们发现,我们发现将以来分成俩中类型即充分有用:窄以来,子RDD的每个分区依赖于常数的父分区(与它的大小不成比例),和宽依赖,子节点的每个分区依赖于所有父节点的分区。例如,map导致了窄依赖,而连接导致宽依赖关系。(除非父是哈希分区的)。 在图5中我们展示了例子。
这个讨论有俩个原因。第一窄依赖允许流水线的执行在一个集群节点上,它可以计算所有的分区。可以逐个元素的基础上应用映射和过滤。相反,广泛的依赖关系要求所有来自父分区的数据可用并且在节点之间使用类似Mapreduce的操作进行shuffle。第二,在节点故障之后进行恢复在窄依赖上更有效,因为只有丢失的分区需要重算,它们可以在不同节点上并行进行重算。相反在宽依赖的血缘图的中,一个失败可能导致RDD的所有祖先中的某些部分丢失,需要计算重新执行。
由于我们为RDD选择了通用接口,我们可以实现大部分Spark中的转换只需要少于20行代码。我们在5.1中概述了一些例子。然后我们讨论如何使用RDD接口进行调度(§5.2)。最后,我们讨论在基于rdd的程序中检查点数据何时有意义(5.3)。
5.1 RDD实现的例子
HDFS 文件:我们示例中的输入 RDD 都是 HDFS 中的文件。对于这些 RDD,partitions
返回文件中每个块对应的一个分区(每个 Partition 对象中存储着该块的偏移量),preferredLocations
给出该块所在的节点,iterator
读取该块。
map:对任何 RDD 调用 map
会返回一个 MappedRDD 对象。这个对象具有与父 RDD 相同的分区和首选位置,但在其 iterator
方法中应用传递给 map
的函数来处理父 RDD 的记录。
union:对两个 RDD 调用 union
会返回一个 RDD,其分区是父 RDD 分区的并集。每个子分区通过对对应父分区的窄依赖来计算。
sample:采样类似于映射,不同之处在于 RDD 为每个分区存储一个随机数生成器的种子,以确定性地从父 RDD 中采样记录。
join:连接两个 RDD 可能会导致两种窄依赖(如果它们都使用相同的分区器进行哈希/范围分区),两种宽依赖,或两者的混合(如果一个父 RDD 有分区器,另一个没有)。
5.2 spark 任务调度
我们的调度器使用RDD结构以找到每个动作的有效的执行计划。调度器的接口是runJob函数,它接受一个RDD来工作,一组感兴趣的分区和一个要在分区上运行的函数。这个接口充分的表达了spark中的动作(count,collect,save等)。
总的来说,我们的调度器类似,Dryad,但还要考虑哪些RDD分区在缓存中可用。调度器检查目标RRDD的血缘图以构建执行阶段的DAG。每个阶段尽可能包含多的具有窄依赖的流水线转换操作。阶段的边界是宽依赖所需的shffle操作,或者任何能使父RDD的计算短路的缓存分区。图6展示了一个例子。当每个阶段的父节点结束时,我们启动任务来计算缺失的分区。
图6:这个例子展示了spark如何计算任务阶段。有实心轮廓的是RDD。分区是带阴影的矩形,如果它被缓存了则为黑色。为了运行一个动作在RDD G上,调度去在宽依赖上构建阶段,管道在每个阶段内构建窄转换。在这个例子中,阶段1不需要运行所以我们运行2,然后3。
调度器基于数据局部性防止数据以最小化沟通。如果任务需要处理缓存分区,我们发出我们将它发送到具有分区的节点。否则,如果任务处理分区包含了RDD为其提供首选位置(因为HDFS中数据的局部性),我们发送到到这些为位置。
为了宽依赖(即shuffle依赖),我们在父分区持有的节点上当前实现了中间记录以简化错误恢复,类似Mapreduce实现map输出一样。
如果任务故障,我们重新运行它在另一个节点上,只要它的父节点仍然可用。如果一些阶段变得不可用了(即,shffle的map端的输出丢失了),我们重新提交任务已计算这些状态丢失的分区。
最后查看动作,它允许用户从哈希或范围分区的RDD中获取元素,提出了一个有趣的问题。当lookup被驱动程序调用,我们可以使用现有的调度器结构构件所在的分区。然而,我们也尝试允许集群上的任务(映射)以调用lookup,让用户将RDD视为大型分布式哈希表。在这个例子中,在任务和RDD之间的依赖被查找而没有被完全捕获(因为工作节点调用lookup),但是我们让任务告诉调度器,如果它找不到任何节点上注册的RDD缓存分区,则计算有问题的RDD。
5.3 检查点
虽然RDD跟踪的血缘信息总是允许程序从故障中恢复,这种恢复对于有长血缘链的RDD来说可能很耗时。例如在Pregel任务中,每个迭代器的顶点状态和消息依赖于前一次迭代。因此他可以具有长血缘的RDD检查点进行稳态存储存储。
一般来说,检查点被用在长血缘图的RDD构成的长依赖上。集群上一个节点的失败可能会导致一些来自父RDD的数据片的丢失,需要完全重新计算。相反,对于在稳态存储中的数据的窄依赖,比如我们逻辑回归的点,(4.1章)和我们优化变体中不可变的顶点状态pregel,检查点可能永远都不值得。当一个节点发生故障时,这些RDD丢失的分区可以在其他节点上重新计算,只需要整个RDD的一小部分成本。
spark现在提供了检查点API但是将哪些数据要检查点的决策留给了用户。在未来的工作中,我们计划使用有助于成本的分析自动进行检查点依已选择最好的在RDD血缘图中的切口进行检查点。
记住因为RDD是只读的,它们可以在后台被检查点,而不会产生任何维护一致性开销(即copy-on-write模式,分布式快照或者程序暂停)。
6 实现
我们使用大约10000行Scala代码实现了Spark。这个下同可以用来处理任何Hadoop数据源(HDFS,HBase)作为输入,使它易于集成到Hadoop环境中。我们不需要修改Scala编译器;Spark是作为一个库实现的。
我们讨论实现中技术上有趣的三个部分 :我们定义Scala的解释器允许交互使用Spark(6.1)缓存管理(6.2),和debug工具rddbg(6.3)
6.1 解释器集成
Scala引入了交互式的shell类似于Ruby和Python。考虑到内存数据获取的延迟,我们想要让解释器中交互式的运行Spark,作为挖掘大数据集的大规模并行计算器。
Scala的解释器通常通过用户输入的每一行编译一个类,加载它进入JVM,并且在它上面执行函数来运行。这个类包含了一个单例对象,它包含了该行的变量或者函数,并且在初始化方法运行该行代码。例如,如果用户输入var x = 5接下来跟一个println(x),解释器定义一个类调用包含x的Line1,并且将第二行编译为println(Line1.getInstance().x)
我们对Spark的解释器做了俩个修改
var x = 5 val f = () => println(x)
在这个例子中f就是一个闭包,但当f被序列化传输到工作节点的时候会出问题,因为Java在序列化闭包的时候不会传播对象图.它只会将闭包本身(函数
f
)序列化,但没有序列化Line1
类和它的成员变量x
。这样工作节点就得不到x的值了。spark为了让闭包能够正常工作,修改了scala代码的生成逻辑,使得每一行代码创建的对象能被直接引用,而不是通过静态方法的访问。
- Class shipping:为了让工作节点执行类创建在每一行上的字节码,我们让这些类的交互服务在HTTP上传输。
- Modified code generation:一般,为每行代码创建单例对象是通过其对应类的静态方法访问的。这意味着当序列化引用的前一行定义的变量闭包时,比如上例中Line1.x,Java不会遍历对象图以传输包装 x的Line1 实例。因此工作节点将不会接收到x。我们修改了代码的生成逻辑以直接引用每个对象的实例。
图7中展示了,解释器如何在我们的更改之后将用户键入的一组行转换为Java对象。
我们发现 Spark 解释器在处理作为我们研究的一部分获得的大型跟踪数据以及探索存储在 HDFS 中的数据集时非常有用。我们也计划使用它作为作为一个交互工具提供高级别的数据分析语言,比如SQL和MATLAB的变体。
6.2 缓存管理
因为大部分是扫描操作,因此采用lru也就是最近最少使用来进行替换。同时程序员也可以决定哪些RDD是更重要的。简单来说是缓存存不下RDD了怎么办。
我们的工作节点缓存了RDD分区在内存中,就像Java对象一样。我们在RDD的级别上使用了LRU置换策略(即我们不会为了从同一RDD加载其他分区而从RDD中驱逐分区),因为大部分操作是扫描。我们定义这个简单的策略目前在我们用户应用上工作的很好。想要更多控制的程序员也可以通过保留优先级作为参数传递给cache来设置RDD的保留优先级。
6.3 RDD程序的Debug工具
虽然我们最初设计RDD是为了容错而进行确定性的重算,这一特性也便于调试。我们构建了叫做rddbg的工具它使用程序记录的血缘信息,让用户:
- 通过程序和查询交互式的,重建任何RDD的创建
- 通过输入它所以来的重新计算的RDD分区,在单进程Java调试器中重新运行作业中的任何任务(即 jdb)
我们强调,rddbg不是一个完整的重放调试器:特别是,它不会重放不确定的代码或行为。但是,它对于发现逻辑错误以及某个任务持续缓慢的性能错误非常有用(由于数据的歪斜或者异常的输入)
举了一个实际的例子,解释如何使用
rddbg
来修复一个用户在垃圾邮件分类任务中的错误。在这个任务中,所有的迭代结果都是零。通过重新执行一个reduce任务,调试器发现了一个问题:输入的权重向量(存储在自定义的稀疏向量类中)是空的。由于稀疏向量类中的未设置元素返回零,并不会抛出运行时异常,因此问题没有被及时发现。通过在向量类中设置断点,重新执行任务后,调试器进入了反序列化代码,发现向量类的字段名称数组在反序列化时为空,从而找到了问题的根源:稀疏向量类中的数据字段被错误地标记为transient
(一个过时的注解),导致它没有被正确序列化。
例如,我们使用rddbg来修补用户在垃圾邮件分类任务重的bug,其中所有的迭代器只产生0值。在调试器中重新执行reduce任务很快的表明,输入权重向量(存储在自定义系数向量类中)出乎意料的为空。由于从系数向量未设置元素的读取按照设计范围0,因此不会发生运行时异常。在向量类中设置一个断点并且再次执行该任务,很快就会进入反序列化代码,其中我们发现被反序列化的向量类字段数组也是空的。这允许我们诊断出bug:稀疏向量类中的数据字段错误地被标记为瞬态(transient)(与以前的序列化库一起使用的过时注释),防止它被序列化。
rddbg为程序执行增加了最小的开销,因为程序必须序列化并通过网络发送与每个RDD相关的所有闭包。我们只是将它简单的记录到磁盘上。
7 评估
8 相关工作
9 结论
我们提出了弹性分布式数据集(rdd),这是一种用于商品集群上数据并行应用的分布式内存抽象。rdd通过工作集支持广泛的应用程序,包括ai图算法和交互式数据挖掘,同时保留数据流模型的吸引人的特性,如自动故障恢复、掉队缓解和位置感知调度。这是通过限制编程模型来实现的,以允许有效地重建RDD分区。我们对rdd的实现表现得更好Hadoop在迭代作业中最多可以提高20倍,并且可以用于交互式查询数百gb的数据。