Pregel:一个大规模的图计算系统

概要

许多计算问题涉及大型的图。标准的例子包括了网页图和多样的社交网络。这些图的尺度,在某些情况下有数十亿个顶点,万亿条边-对有效的处理它们带来了挑战。在这篇paper中我们提出了合适这个任务的计算模型。程序被表示为一个迭代序列,在每个迭代中,顶点(vertex)可以接收前一个迭代发送的消息,发送消息到其他节点,并且修改其自身的其出边的状态,或者改变图的拓扑。这个以顶点为中心的方法足够灵活,可以表示广泛的算法集。这个模型是为了高效设计的,在上百台商用计算器上实现可伸缩和容错,并且它隐含的同步性让推理程序更加容易。分布式的细节被隐藏在了抽象的API中。结果是一个用于处理大型图的计算框架它具有表现力,且易于编程。

1 介绍

different flavors of clustering 比如聚类算法。

最小切割:将图分成俩个部分,使得切割段的边的权重之和最小。

联通分量是图论重的一个概念,指的是一个图中所有联通子图(每一个图中的任意俩个阶段都有路径相接)。在一个无向图中,连通分量是最大子图,其中所有节点都能通过边相互到达,如果一个图是联通的,那么它只有一个联通分量。

网络让web图是一个人们的分析和研究对象。Web2.0激起了人们对于社交网络的兴趣。其他大的图-例如传输路径,新闻的相似点,疾病爆发的路径,科学著作的关联之间-已经处理了几十年了。频繁使用的算法包括最短路径计算,不同风格的集群,以及页排名主题的变化。有很多的图计算问题在真实的值上,即 minimum cut以及联通分量。

有效的处理大尺度的图是挑战。图算法通常表现出交叉内存访问局部性,每个节点的工作和执行过程中改变并行度很少,分布在多个机器上恶化了局部性问题,并且增加了机器会在计算时故障的概率。尽管图计算无处不在,并且在商业中很重要,据我们所知,在大规模分布式环境中,没有可伸缩的通用系统能够在任意图的表示上实现任意的图算法。

实现一个算法以处理大尺度的图通常意味着在以下做出选择:

  1. 打造一个定制的分布式基础设施,通常需要达令的实现工作,对于每个新的算法或图表示,都必须重复这个过程。
  2. 依赖于显存的分布式计算平台,通常不适合图计算的表示。例如,Mapreduce,非常适合大尺度的计算问题。它有的时候被用在大型的图计算,但是这会导致劣化的性能和可用性问题。处理数据的基本模型已经扩展,以促进聚合以及类SQL查询,但是这些扩展通常不适合图算法,因为图算法更适合消息传递模型。
  3. 使用单机计算库如
  4. 利用单机图算法库,如BGL[、LEDA、NetworkX、JDSL ,斯坦福GraphBase,或FGL,限制了可以解决的问题的规模。
  5. 使用已经存在的并行图系统。并行BGL和CGMgraph哭解决并行图计算,但是不能解决容错或者其他大尺度分布式系统上的重要问题

这些选择没有适合我们的。为了解决分布式处理大尺度的图,我们构建了可扩展的和容错的带API平台,它有足够灵活的表达任意算法。这个paper讨论最终得到的系统,叫做Pregel和并且上报我们的经历。

在计算图中,outgoing edge出边指的是从一个节点出发,指向其他节点的边。

Pregel程序受到Valiant的批量同步并行模型的启发。Pregel包括了列带序列,叫做supersteps(超级步)。在superstep中框架概念上并行的执行用户在每个顶点定义的函数。该函数指定单个顶点V和单个superstep S的的行为。它可以读取超步S−1中发送给V的消息,发送信息到其他的顶点这些消息将在超级步S+1接受,并且定义V的状态和出边。信息通常沿着出边进行发送,但是信息可能会被发送到任何标识符已知的节点。

超级步是计算过程中的一轮迭代。每一次超级步结束之后都要进行一次全局同步。

以顶点为中心的方法让人想起Mapreduce,用户专注于本地操作,独立的处理每个项,并且系统将这些动作组合起来件个计算提升到一个大的数据集。通过设计模型非常适合分布式的实现:它不暴露任何机制来检测超级步中的执行顺序,并且所有的操作都是来自超级步S到超级步S+1。

这个模型的同步让推理程序的语义在实现算法的时候更加简单,并且确保了Pregel程序内在没有死锁和异步系统常见的数据竞争。原则上Pregel程序的性能应该比与异步系统的性能相争。因为典型的图计算有比机器更多的顶点,所以应该能够平衡机器的负载,超级步之间的同步不会增加过多的延迟。

2 计算模型

输入到Pregal计算的输入是一个有向图,其中每个顶点由字符串顶点表示符唯一的标识。每个顶点关联一个可修饰的用户定义的值。有向边与它们源顶点相关联,每条边由一个可修改的用户定义值和一个目标顶点表示符组成。

一个典型的Pregel计算包括 输入,当图被初始化,然后是一系列全局同步点分割的超级步直到算法通知和输出结束。每一个都执行相同的用户定义的函数,该函数表达了算法的逻辑。顶点就可以修改它自身的状态或者它出边的状态,接受前一个超级步中发送给它的消息,发送消息到其他顶点(在下一个超级步中接收),或者甚至改变图的拓扑。在这个模型中边不是一等公民,不关联计算。

算法的终止是基于每个节点投票停止。在超级步0时,每个节点都为活动(active)状态;所有的活动顶点都参与任何给定超级步的计算。顶点通过投票停止来使自己无效。这意味着顶点没有进一步的工作要做,除非外部触发或者Pregel框架,并且Pregel框架不会在后续的超级步中执行该顶点,除非它收到消息。如果通过新消息重新激活,顶点必须显式的重新激活。当所有顶点同时处于非活动状态并且没有消息正在传输时,整个算法将终止。这个简单的状态机制I在图1进行描述。

Pregel 程序是输出是由顶点程序输出的一组值。输入通常是同构的有向图,但这不是系统必要的属性,因为顶点和边可以在计算时被添加和移出。例如,可能会从一个大的图中选择一小组不相连的顶点。图挖掘算法可以从图中输出简单的挖掘聚合统计信息。

图2:最大值的例子。虚线是消息,阴影顶点已经投票停止。

图2描述了使用一个简单的例子描述了这个概念:给定一个强的连接图,其中每个顶点包含一个值,它将最大值传播到每个顶点。在每一个超级步中,任何消息中学习到较大值的顶点都将其发送给所有相邻顶点。当没有顶点在超级步中进一步改变。算法终止。

我们选择了一个纯消息传递模型,省略了远程读取和其他模拟共享内存的方式,有俩个原因。首先,消息传递的表现力是足够的,所以不需要远程读取。我们还没有发现任何的图计算它的信息传递不能被表示。其次,这个选择对性能更好。在集群环境中,读取远程机器的值会导致较高的延迟不能被隐藏。我们的信息传递模型允许我们传递模型允许我们通过批量异步传递消息来分摊延迟。

图算法可以被写为一些列的链式的Mapreduce调用。我们因为可用性和性能的原因选择了不同的模型。Pregel将顶点和边保存在执行的机器上,并且只对消息使用网络传输。然而Mapreduce本质上是功能性的,因此将图算法表示为链式的Mapreduce需要将整个图 的状态传递到下一个-通常需要更多的沟通和相关的序列化开销。此外,需要链式协调Mapreduce的步骤增加了编程的复杂性,这是在Pregal在超级步上的迭代所避免的。

特性MapReducePregel
数据传输方式阶段间传递完整图状态仅传递必要的消息
通信开销
编程复杂度高(需要手动协调阶段)低(抽象为 superstep 迭代)
适用性通用数据处理框架,非专为图算法设计专门针对图处理优化

3 C++ API

这一章我们讨论Pregel最重要的概念 C++ API,省略相对于机械的问题。编写一个Pregel程序需要继承预定义的Vertex类。它的模板提供了三种值类型,相关联的顶点,边,信息。每个vertex都有特殊类型的关联值。这种一致性似乎是限制性的,但是用户可以使用灵活的类型来管理它,就像protocol buffers。边和信息类型表现的和接近。

用户重写虚拟的compute()方法,它将在每个超级步上每个活动的顶点中执行。预定义的Vertex方法允许compute()去查询关于当前顶点和它的边的信息,并且发送信息给其他边。compute()可以通过getValue()检查与顶点相关的值或者通过MutableValue()修改值。它可以使用出边迭代器提供的方法来和修改出边的值。这个状态更新是立即可见的。以为它们的可见性仅限于修改后的顶点,不同的顶点上没有并发值访问的数据竞争。

与顶点及其边相关联的值是唯一在超级步之间持续存在的每个顶点状态。将框架管理的图状态限制为每个顶点或者边的单个值,简化了计算周期、分布故障和回复。

3.1 信息传递

顶点通过发送信息和其他的顶点进行沟通,每个顶点都包含一个小希和目标顶点的名称。信息值的类型通过模板参数指定为vertex类。

一个顶点可以发送任意的消息数量。当V的计算反复噶在超级步S+1中调用,所有在超步骤S中发送到顶点V的消息都可以通过迭代器获得。迭代器中的消息顺序没有保证,但是可以保证信息传递并且不回被复制。

一个常见的使用模式是顶点V迭代所有的出边,发送信息到每个边的目标顶点,如第四章表述的pagerank算法。然而,destory_vertex需要不是V的邻居。顶点可以从之前收到的消息中学习非邻居的标识符,或者可以隐式的知道顶点标识符。例如,图可以是一个完全图(每个不同的顶点之间都有边链接),具有众所周知的顶点标识符 V1 到 Vn,在这种情况下,甚至可能不需要在图中显式地保持边。

当任何信息的目标顶点不存在时,我们执行用户定义的处理器。例如处理程序可以创建缺失的顶点或者从顶点移除悬空的边。

3.2 组合器

发送一个信息到节点,特别是在另一台机器上到节点增加了一些开销。这个可以被用户在些例子中减少。例如,提供compute()接受一个integer数据,并且只关心和,而不关心单独的值。在这种情况下,系统可以将多个针对顶点 V 的消息合并成一条包含它们总和的消息。减少他们必须传输和缓冲的消息数量。

默认情况下不器用组合器,因为没有一个机械的方法可以找到一个有用的组合函数,这与用户的compute()方法的语义一致。为了实现这种优化,用户要继承Combiner类,并重写虚拟的Combine()方法。对于那些(如果有的话)消息被组合,呈现给合并器的分组,或合并的顺序,因此合并器应该仅对可交换和结合的操作启用。

对于一些算法,比如单源的最短路径(5.2章),使用接合器减少了超过四倍的流量。

3.3 聚合器

并行的 aggregator(聚合器)是一个全局沟通机制,监视器,以及数据每个顶点可以提供值到超级步S中进行聚合,系统使用reduction操作结合了这些值,并且结果的值可以给超级步S+1中所有顶点使用。Pregel包含许多预定义的聚合器,比如min,max,sum操作在各种证书和字符串之间操作。

Aggregate可以用来进行统计。例如,将总和聚合器应用于每个顶点的出度,得到图中的边总数。更复杂的规约操作可以生成统计的直方图。

聚合器也可以被用于全局协调。例如,compute()的一个分支可以被执行,直到and 聚合器确定所有顶点满足某些条件,然后另一个分支可以被执行直到结束。一个min或者max 聚合器应用到顶点ID,可以选择一个顶点在算法扮演一个特殊角色。

为了定义新的聚合器,用户集成了一个预定义的aggregator类,并指定如何从第一个输入值初始化聚合值,以及如何将多个部分聚合值减少为一个。聚合操作应该和交换操作是关联的。

没人情况下,聚合器只从单个超级步中减少输入值,但是它仍然可以定义一个使用所有超级步的输入值的粘性聚合器。这是很有用户的,例如,维护一个只有在编增加或者删除的时候才调整的全局计数边。

松弛操作就是通过逐步更新路径估计值,逼近最短路径的长度。

更高级的用途是可能的。例如一个聚合器可以用来实现分布式优先级队列以进行∆步最短寻路算法。每个顶点根据其暂定的距离分配到一个优先级桶。在一个超级步中,这些顶点将它们索引共享给min聚合器。在接下来的超级步骤中,最小值被广播给所有工作节点,最低索引桶中的顶点松弛边缘。

3.4 拓扑变化

联通图:无向图中,任意俩个顶点都有路径相通(间接也可以)则称为连通图

联通图不存在回路则称为树

生成树:它是一个联通图的联通子图,含有图中全部顶点。有且仅有足以构成一棵树的n-1条边(如果生成树中再加一条边则构成环)

最小生成树,连通图的生成树中,边权和代价最小的生成树。

比如要在n个城市之间铺设光缆,让n个城市之间都可以通信,但是光缆费用很高,城市间铺设的费用不同,因此目标是铺设光缆的费用最低。

一些图算法需要改变图的拓扑,比如聚类算法,可能用单个顶点替换每个集群,以及最小生成树算法,可能会删除除了树边以外所有的边。只要和用户compute()函数可以发送消息一样,它也快要发出请求添加或者删除图或者边。

多个顶点可能发出冲突的请求在同一个超级步中(俩个添加顶点V的请求初始值不同)。我们使用俩个机制来达到确定性:部分有序和处理器。

与消息一样,在请求发出之后修改在超级步中生效。在超级步中删除被首先执行,在移出顶点之前移出边,因为移出顶点隐含了删除它的边。此外根据移出,顶点添加在边之前,并且所有的变化都发生在对compute()调用之前。这种部分排序对大部分冲突产生了确定的结果

剩余的冲突通过用户定义的处理器解决。如果多个请求创建了相同的顶点在一个超级步中,那么系统只会随意选择一个,如果用户有需要可以指定一个冲突解决策略,通过在Vertex子类中定义合适的处理方法。相同的处理机制用来被解决由于多个顶点删除请求,或者多条边添加或删除请求引起的冲突机制。我们把解决方案交给处理器以让compute()的代码简单,它限制了handler和compute()之间相互的作用,但是在现实中不是一个问题。

我们的协调机制是懒的:全局变化不需要协调直到哪个点被应用。这种选择有助于流式处理。直觉告诉我们,冲突设计到对顶点V的修改由顶底V自身处理。

Pregel提供了完全的本地修改,即删除或者添加顶点拥有的出边或者删除它自己。本地修改不会引入冲突,使用更简单的序列编程语义使他们立即有效简化了分布式编程。

3.5 输入和输出

对于图有很多可能的文件形式,比如文本文件,一组在关系型数据库中的顶点,或者Bigtable的还干过中。为了避免特定的文件格式选择,Pregel把讲输入文件解析为图的任务与图计算解耦。类似的,输出可以以任意形式生成,并且以最适合给定应用程序的形式存储。Pregel哭提供了读取器和写入器给许多公共的文件形式,但是用户应对不常见的需求可以写自己的,通过继承类读取和写入器的抽象。

4 实现

Pregel是为了谷歌的集群架构设计的。每个集群包含上千个通用PC组织在机架内,机架内部带宽高。集群相互联通但是分布是物理的。

我们的应用通常在集群上执行管理系统,它调度任务以优化资源的分配,一些时候杀死实例或者移动他们到不同的机器。系统包括名称服务,因此实例可以通过逻辑名称引用实例,无关于当前绑定的物理机。持久化数据被存储在类似分布存储系统的文件上,GFS或者Bigtable,以及临时的数据如本地磁盘上的缓冲消息。

4.1 基础架构

只要知道了顶点ID和分区函数就能计算出顶点在哪个分区。

Pregel库划分了图到分区中,每个由一组顶点和这些顶点的出边组成。一个顶点对一个分区仅仅依赖于该顶点ID。这意味着它可以知道,给定的顶点属于哪个分区,即便该顶点属于不同的机器,或者即便顶点不存在了。默认的分区函数只是hash(ID) 然后模N,其中N是分区的数量,但是用户可以替换它。

给工作机器分配顶点是在pregel中分布不透明的地方。一些应用在默认的指派工作的很好,但是自定义赋值函数可以更好的使用图中的局部性。对于Web图启发式的方法是将表示同一站点的顶点放在一起。

没有错误的情况下,Pregel程序的执行包含几个状态:

  1. 用户程序的许多副本在集群机器上开始执行。其中一个副本充当主副本。它没有被分配到图的任何部分,但是要对集群进行协调。工作节点使用集群管理系统名称服务以发现本master的位置,并且发送注册信息给master。
  2. master确定图有多少个分区,指派一个分区或者多个分区到一个机器上。这个数量可以被用户控制。每个work有更多个分区可以实现分区之间的并行性和更好的均衡负载,通常会提升性能。每个工作节点负责维护它的图不分,执行用户compute()方法于他的顶点上,并管理器其他节点的收发。每个工作节点都被分配了所有工作节点的完整任务集合。
  3. master指派用户输入的一部分到工作节点上。输入被视为一组记录,每一组都包含任意数量的边和顶点。输入的划分是正交于图本身的,它通常基于文件边界。如果工作节点负载。如果工作节点加载了一个顶点该顶点属于工作节点所在的图的一部分,相应的数据结构(4.3章)会立即更新。否则,工作节点将消息排队发送给远程拥有该顶点的对等体。在输入完全结束加载后,所有的顶点被标记为激活。
  4. master命令每个工作节点执行超级步。工作节点遍历活动顶点,每个分区使用一个线程。工作节点调用每个活动顶点的compute(),传递在前一个超级步中发送的消息。消息是异步发送的,以实现计算、通信和批处理的重叠,但在超级步骤结束之前交付。当工作节完成时会相应给master,告诉master下一步有多少个顶点是被激活的。
    只要有任何顶点是活动的,或者信息在传输这个步骤就会重复。
  5. 在计算停止后,master可能命令每个工作节点保存它的部分图。

4.2 容错

容错通过检查点来进行。在超级步开始的时候,master命令工作节点保存它分区的状态以持久化存储,包括顶点值,边值,和到来的信息;master单独保存聚合器的值。如果ping节点在指定的间隔没有收到ping消息,任务处理终止。如果master没有收到work的返回,工作节点标记工作节点处理故障。

当一个或多个工作节点故障,分配给这些工作节点的分区状态将丢失。主服务器将图重新分配给当前可用的工作线集。并且在超级步S开始的时候从最近一次检查点钟重新加载分区状态。该检查点可能比故障发生前任何分区完成的最新超级步骤S'早了几个超级步骤。需要恢复缺失的超级步。我们选择检查点的频率是基于失败模型的平均时间的,平衡检查点的成本和恢复的成本。

记录所有发送出去的消息,让消息能进行复用

限制恢复正在开发中,以改善恢复的成本和延迟。除了基本的检查点,工作节点还在图加载和超级步骤期间记录来自其分配分区的传出消息。然后恢复仅限丢失分区,它是从检查点进行恢复的。使用记录的来自健康分区的和恢复分区中重新计算的信息,系统重新计算丢失的超级步直到S’。

这种方法通过只计算丢失的分区,来节省恢复期间的计算资源,并且能够提升恢复的延迟,因为每个工作节点 可能正在恢复更少的分区。保存输出信息增加了工作负载,但是通常机器有足够的磁盘带宽来确保I/O不会成为瓶颈。

受限恢复 要求用户算法是确定的,为了避免将原始执行中保存的消息与恢复中的新消息不一致。通过发送为随机数使得随机算法具有确定性的在一超级步和分区上。非确定的算法会禁用受限恢复,回到基本的恢复机制。

4.3 工作节点的实现

工作节点机器维护了图的一部分状态在内存中。概念上将这可以看错从顶点ID到顶点状态映射,每个顶点的状态包含了它当前的值,一个出边的列表(边的目标顶点ID,边的当前值),一个队列包含了传入的消息,和一个表示指定那个顶点是激活的。当工作节点执行超级步它循环通过所有的顶点然后调用compute(),它传递当前值,传如消息的迭代器,以及出边的迭代器。它无法访问到入边,因为每个如边是源顶点的一个列表,通常在不同的机器上。

由于性能原因,活动顶点表示被单独的存储在到来信息队列中。此外,虽然顶点和边只存在一个副本,存在活动顶点标志和传入消息队列的俩个副本:一个用于当前超级步,另一个用于下一个超级步。当工作节点在同事处理超级步S中的顶点时,它同时在另一个线程中接收来自执行相同超步骤的其他工作节点的消息。因为顶点接受前一个超级步中发送的消息(见2章),超级步S和S+1必须保持分开。类似的,顶点V的消息到达意味着V可能在下一个超级步中变得活跃,而不一定是当前的超级步。

当compute()请求发送信息给其他顶点,工作节点首先确定目标节点是自己有的还是远程工作节点的,或者拥有相同发送者的工作节点。在远程例子中,信息被交付的目标节点缓存。当缓冲大小达到了预制,最大的缓冲将会被异步刷盘,将每个消息作为单个网络消息传递给其目标工作节点。在本地的例子中,一个优化是有必要的:消息被直接防止在目标顶点传入的消息队列中。

如果用户提供了组合器(3.2),当消息添加到输出信息队列并且输入队列接受了它,将应用组合器。后者不会减少网络使用,但是减少空间需要以存储信息。

4.4 master的实现

master主要负责协调活动的工作节点。每个工作节点被分配一个唯一的标识符在它注册的时候。master维护所有现在已知的活动的工作节点的列表,包括工作节点的唯一标识符,它寻址信息,并且是已经分配图的一部分。master数据结构的大小是和分区数量成比例的,而不是顶点或边的数量,所以耽搁master可以协调计算即便是非常大的图。

大部分master的操作包括输入输出,计算以及从检查点保存和恢复,终止于俩个屏障处:master发送相同的请求到每一个工作节点,在操作开始时,master知道它还活着,并且等待每个工作节点的响应。如果任何工作节点故障了,master进入4.2章描述的恢复模式。如果屏障同步成功,master执行下一个阶段。例如,在计算屏障的情况下,master增加了全部的超级步索引,并且执行下一个超级步。

master也维护了关于计算和状态图的统计信息,比如图的总数,出度的直方图,活动顶点的熟练,最近超级步时间和消息流量,以及所有用户聚合器的值。以启用用户监控,master运行https服务展示这些信息。

4.5 聚合器

通过使用聚合函数在用户提供的一组值上,聚合器计算单个全局值。每个工作节点维护聚合器实例的集合,通过类型名和实例名进行定义。当工作节点为图的任意分区执行超级步,工作节点结合所有的值提供给聚合乘一个单独的本底值:对分区中所有的工作节点进行部分缩减的聚合器。在超级步的最后,工作节点形成一个树,将部分规约后的聚合器规约为全局值并且传送给master。我们使用基于树的规约方法,而不是工作节点链的流水线-在CPU进行规约的时候并行化调用。在下一个超级步骤开始时,master将全局值发送给所有worker。

5应用

6实验

7相关工作

8 结论和未来的工作

这个paper的贡献是建模了合适的大吃土图计算并且描述了它的生产质量,扩展性容错实现。基于来自用户的反馈,我们认为我们已经成功的让我们的模型变得好用和可用。许多Pregel应用已经被部署,更多的还在设计和实现中调整。用户上报了一旦它们切换到vertex式思考程序模型,API就符合直觉,弹性以及易用。这并不奇怪,因为我们一开始就与影响API的早起采用者合作。例如,聚合器被添加用来移出用户在早期Pregel模型发现的限制。其他可用性Pregel是的驱动来自用户的体验其中包括一组状态页面,其中包含有关Pregel程序进度的详细信息,一个单元测试框架,和一个单机器模型它有助于原型和快速调试。

pregel的性能扩展性和容错已经可以满足数十亿个顶点的图。我们正在研究缩放到更大的图形技术,比如放松模型的同步性以避免更快的工作节点需要频繁的等待超级步之间的屏障。

当前全部的计算状态都存在内存中。我们已经漏出了一些数据到本地磁盘,并将继续这个方向,当不能使用TB级别的内存时,以让允许大尺度图的计算。

分配顶点到机器以最小化机器内部的交流是一个挑战。如果拓扑与消息流量对应,基于拓扑的输入图的分区可能已经足够了,但也许不是。我们想要设计一个动态的重分区机制。

稀疏图是顶点较多边较少的图。

Pregel是为了稀疏图设计的,通信主要发生在边缘上,我们不期望关注于它的更改。尽管已经采取措施支持扇入和扇出流量,当大多数顶点不断的向大多数顶点发送消息时,性能将会受到影响。然后真实的稠密图是函件的,在稀疏图上进行密集通信的算法也是如此。一些算法可以转换成Pregel友好的变体,比如通过使用聚合器,接合器,或者拓扑改变,当然这样的计算对于任何高度分布式系统都是困难的。

一个实际的问题是,Pregel正在成为我们用户群的生产基础设施的一部分。我们不能在不考虑兼容性的情况下随意更改API。然而我们相信,程序接口是足够抽象和灵活的,能够适应系统的进一步发展。

Last modification:December 4, 2024
如果觉得我的文章对你有用,请随意赞赏