Advertisement

毕设第二周(GraphX环境搭建 GraphX API 以及对Pregel的熟悉)

阅读量:

GraphX环境搭建与API的熟悉

我在本地搭建了一个完整的GraphX运行环境,并成功运行了几项典型测试案例。关于该主题的相关信息,请参考官方文档以获取详细的技术说明。官方文档中列举了几项个人认为较为基础且重要的核心操作:

复制代码
    class Graph[VD, ED] {
    //这个是把图存成Table所需要的数据,上一个周报里面提到了
    val vertices: VertexRDD[VD]
    val edges: EdgeRDD[ED]
    val triplets: RDD[EDgeTrplet[VD, ED]]
    // 关于cache方面的function, 对优化方面有影响,需要编程的时候比较了解
    def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
    def cache(): Graph[VD, ED]
    // 图形变换方面的function(不改变图的structure, 只改变property)
    def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
    def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
    def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
    def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
    def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
    //Iterative graph-parallel computation, 主要是Pregel, 后面会再详细说
      def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
    }

Pregel框架的熟悉

在这一领域的相关内容中,我对该领域的研究工作已有较为深入的了解。

BSP

基于 BSP 原则设计了其高层组织架构。BSP 的核心理念在于将计算任务划分为多个超步(sup erstep),每个超步内部按照严格的同步机制协同工作。在这些超步之间通过消息传递实现状态更新与同步协调。这种并行计算模型特别适合分布式系统环境下的大规模数据处理与分析任务。

  1. 首先,在每一个processor中执行本地计算。
  2. 然后完成processor之间的通信。
  3. 最后一步使用一个barrier来确保同步。
    其中每个barrier既是上一步superstep的结束点也是下一步superstep的起始点。
BSP

Pregel

与BSP相似地,在Pregel框架中将一系列迭代称为超级步骤(supérbêta)。在每个超级步骤(step)中,默认情况下,默认情况下,默认情况下,默认情况下,默认情况下,默认情况下),框架会为每个节点V调用一个用户自定义函数(UDF)。这个UDF不仅可以接收来自上一超级步骤(S-1)的消息,并在下一超级步骤(S+1)时向其他节点发送消息;还可以修改该节点V的状态以及其发出的所有边(outgoing edges)。需要注意的是,在Pregel框架中,默认情况下消息仅限于相邻节点;然而,在某些高级实现中允许消息发送到任意已知ID的目标。

整个过程停止的依据是每一个节点都volt to halt,节点状态图如下:

vertex state machine

Shortest Path在GraphX上的实现

在graphx/main/src/lib/ShortestPaths.scala项目中开发了任意两点间的最短路径计算功能,并依赖于Pregel框架。

复制代码
    object ShortestPaths {
      type SPMap = Map[VertexId, Int]
    
      private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
    
      private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
    
      private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
    }.toMap
    
    
      def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
    val spGraph = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
    }
    
    val initialMessage = makeMap()
    
    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }
    
    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
      val newAttr = incrementMap(edge.dstAttr)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
      else Iterator.empty
    }
    
    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
      }
    }

由于采用Scala语言编写该代码项目,在设计上确实呈现出简洁明了的特点。然而代码也具有较高的复杂度和抽象性,在这种情况下进行初步分析是一个合理的选择。

复制代码
     Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)

该段采用了Pregel函数来处理任务,并通过三个关键参数进行配置:顶点程序(每个节点执行的操作)、消息发送(传递的具体内容),以及消息聚合(如何合并消息)。

复制代码
     def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }

vertexProgram里面执行的操作只有一步,就是addMaps,

复制代码
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
    }.toMap

即将每个节点当前的信息(存储于该节点到特定标记物的最近距离)与message中所记录的距离进行比较,并将其更新为更小的那个距离值。

复制代码
    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
      val newAttr = incrementMap(edge.dstAttr)//新的距离是老的距离加1(无权图)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))//类似Dijkstra的更新方式
      else Iterator.empty
    }

总结:

这一周的主要任务是熟悉GraphX环境。通过研读GraphX的部分示例代码,并结合官方文档的学习,在理解这些代码时略感吃力。不过没关系,在接下来的一周时间里我打算利用时间深入学习Scala中关于Map的操作符相关内容,并特别关注其运算符实现细节。以便更好地解析GraphX源码。

全部评论 (0)

还没有任何评论哟~