Advertisement

【Spark】(十八)Spark GraphX 中的 pregel函数

阅读量:

Pregel概述

Pregel是Google提出的用于大规模分布式图计算框架,常用来解决以下问题:

图遍历(BFS)

单源最短路径(SSSP)

PageRank计算

Pregel的计算由一系列迭代组成,称为supersteps。Pregel迭代过程 (实现过程)如下:

每个顶点从上一个superstep接收入站消息

计算顶点新的属性值

在下一个superstep中向相邻的顶点发送消息

当没有剩余消息时,迭代结束

源码参数分析

复制代码
      def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
      }
    
    
      
      
      
      
      
      
      
      
      
      
    

相关参数
在这里插入图片描述
参数
在这里插入图片描述
注意事项:
在理解案例之前,首先要清楚关于顶点的两点知识:
1、顶点 的状态有两种:

  • 钝化态【类似于休眠,不做任何事】
  • 激活态【干活】

2、顶点 能够处于激活态需要满足以下任意条件:

  • 成功收到消息
  • 成功发送了任何一条消息

案例:求最短距离

顶点5到其他各顶点的最短距离

实现代码:

复制代码
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.graphx._
    object PregeDemo {
      def main(args: Array[String]): Unit = {
    //1、创建SparkContext
    val conf = new SparkConf().setMaster("local[*]").setAppName("PregeDemo")
    val sc = new SparkContext(conf)
    
    //2、创建顶点
    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )
    val vertexRDD = sc.makeRDD(vertexArray)
    
    //3、创建边,边的属性代表 相邻两个顶点之间的距离
    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(2L, 5L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )
    val edgeRDD = sc.makeRDD(edgeArray)
    
    //4、创建图
    val graph = Graph(vertexRDD,edgeRDD)
    
    
    /* ************************** 使用pregle算法计算,顶点5 到各个顶点的最短距离 ************************** */
    
    //被计算的图中 起始顶点id
    val srcVertexId = 5L
    val initialGraph = graph.mapVertices {
      case (id, (name, age)) => {
        if (id == srcVertexId)
          0.0
        else
          Double.PositiveInfinity
      }
    }
    //    initialGraph.vertices.collect.foreach(println)
    
    //5、调用pregel
    val pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(
      Double.PositiveInfinity,
      Int.MaxValue,
      EdgeDirection.Out
    )(
      (vid: VertexId, vd: Double, distMsg: Double) => {
        val minDist = math.min(vd, distMsg)
        println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")
    //        println("vprog " + vid + " " + vd + " " + distMsg + " " + minDist)
        minDist
      },
      (edgeTriplet: EdgeTriplet[Double, PartitionID]) => {
        if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
          println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
    //          println("sendMsg " + edgeTriplet.srcId + " " + edgeTriplet.srcAttr)
    //          println("sendMsg " + edgeTriplet.dstId + " " + edgeTriplet.dstAttr)
    //          println("sendMsg " + edgeTriplet.attr)
          Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
        } else {
          Iterator.empty
        }
      },
      (msg1: Double, msg2: Double) => math.min(msg1, msg2)
    )
    
    //6、输出结果
    println("输出结果:")
    pregelGraph.triplets.collect.foreach(println)
    //    println(pregelGraph.vertices.collect.mkString("\n"))
    
    //7、关闭SparkContext
    sc.stop()
      }
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

输出结果:

复制代码
    //------------------------------------------ 各个顶点接受初始消息initialMsg ------------------------------------------
    顶点5,属性0.0,收到消息Infinity,合并后的属性0.0
    顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity
    //------------------------------------------ 第一次迭代 ------------------------------------------
    顶点3 给 顶点6 发送消息 Infinity失败
    顶点5 给 顶点6 发送消息 3.0成功
    顶点2 给 顶点4 发送消息 Infinity失败
    顶点4 给 顶点1 发送消息 Infinity失败
    顶点5 给 顶点3 发送消息 8.0成功
    顶点2 给 顶点1 发送消息 Infinity失败
    顶点2 给 顶点5 发送消息 Infinity失败
    顶点3 给 顶点2 发送消息 Infinity失败
    
    顶点3,属性Infinity,收到消息8.0,合并后的属性8.0
    顶点6,属性Infinity,收到消息3.0,合并后的属性3.0//------------------------------------------ 第二次迭代 ------------------------------------------
    顶点3 给 顶点2 发送消息 12.0成功
    顶点3 给 顶点6 发送消息 11.0失败
    
    顶点2,属性Infinity,收到消息12.0,合并后的属性12.0//------------------------------------------ 第三次迭代 ------------------------------------------
    顶点2 给 顶点1 发送消息 19.0成功
    顶点2 给 顶点4 发送消息 14.0成功
    顶点2 给 顶点5 发送消息 14.0失败
    
    顶点4,属性Infinity,收到消息14.0,合并后的属性14.0
    顶点1,属性Infinity,收到消息19.0,合并后的属性19.0//------------------------------------------ 第四次迭代 ------------------------------------------
    顶点4 给 顶点1 发送消息 15.0成功
    
    顶点1,属性19.0,收到消息15.0,合并后的属性15.0//------------------------------------------ 第五次迭代不用发送消息 ------------------------------------------
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

pregel 最短路径过程分析:

调用pregel方法之前,先把图的各个顶点的属性初始化为如下图所示:顶点5到自己的距离为0,所以设为0,其他顶点都设为 正无穷大Double.PositiveInfinity。

1. 当调用pregel方法开始:

首先,所有顶点都将接收到一条初始消息initialMsg,使所有顶点都处于激活态(红色标识的节点)。
在这里插入图片描述
2. 第一次迭代开始:
所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。发送成功的只有两条边:
5—>3(0+8<Double.Infinity , 成功),
5—>6(0+3<Double.Infinity , 成功)
3—>2(Double.Infinity+4>Double.Infinity , 失败)
3—>6(Double.Infinity+3>Double.Infinity , 失败)
2—>1(Double.Infinity+7>Double.Infinity , 失败)
2—>4(Double.Infinity+2>Double.Infinity , 失败)
2—>5(Double.Infinity+2>Double.Infinity , 失败)
4—>1(Double.Infinity+1>Double.Infinity , 失败)。

sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5 成功地分别给顶点3 和 顶点6 发送了消息,顶点3 和 顶点6 也成功地接受到了消息。所以 此时只有5,3,6 三个顶点处于激活态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息 与 自身的属性合并。如下图所示。到此第一次迭代结束。

3. 第二次迭代开始:
顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。 下图所示至此第二次迭代结束。
在这里插入图片描述
3. 第二次迭代开始:
顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。 下图所示至此第二次迭代结束。
在这里插入图片描述
第四次迭代开始:
顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入激活状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并 。见图5
在这里插入图片描述
第五次迭代开始:
顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束,见图6.
在这里插入图片描述
参考文献:
<>
<>

全部评论 (0)

还没有任何评论哟~