Advertisement

基于Spark GraphX的图形数据分析

阅读量:

目录

  • 1、为什么需要图计算

  • 2、图的概念

    • 2.1 图的基本概念及应用场景

    • 2.2 图的术语

      • 2.2.1 顶点(Vertex)和边(Edge)
      • 2.2.2 有向图和无向图
      • 2.2.3 有环图和无环图
      • 2.2.4 度
    • 2.3 图的经典表示法

  • 3、Spark GraphX

    • 3.1 简介
    • 3.2 GraphX核心抽象
    • 3.3 GraphX API
    • 3.4 图的算子
      • 3.4.1 属性算子(数据变换)
      • 3.4.2 结构算子(结构变换)
      • 3.4.3 join算子
      • 3.4.4 练习:找出用户粉丝数量
  • 4、常用图算法

    • 4.1 页面排名算法--PageRank
    • 4.2 连通分量--Connected Component
  • 5、Pregel计算框架

1、为什么需要图计算

  • 许多大数据以大规模图或网络的形式呈现
  • 许多非图结构的大数据,常会被转换为图模型进行分析
  • 图数据结构很好地表达了数据之间的关联性

2、图的概念

2.1 图的基本概念及应用场景

图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构

  • 通常表示为二元组:Gragh=(V,E)
  • 可以对事物之间的关系建模

应用场景

  • 在地图应用中寻找最短路径
  • 社交网络关系
  • 网页间超链接关系

2.2 图的术语

2.2.1 顶点(Vertex)和边(Edge)

一般关系图中,事物为顶点,关系为边
定义一个图:

复制代码
    Graph=(V,E)
    集合V={v1,v2,v3}
    集合E={(v1,v2),(v1,v3),(v2,v3)}
    
    
      
      
      
    
在这里插入图片描述

2.2.2 有向图和无向图

  • 有向图 :在有向图中,一条边的两个顶点一般扮演者不同的角色,比如父子关系、页面A连接向页面B;
复制代码
    G=(V,E)
    V={A,B,C,D,E}
    E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>}  //关系用尖括号表示
    
    
      
      
      
    
在这里插入图片描述
  • 无向图 :在一个无向图中,边没有方向,即关系都是对等的,比如qq中的好友。
复制代码
    G=(V,E)
    V={A,B,C,D,E}
    E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}
    
    
      
      
      
    
在这里插入图片描述

2.2.3 有环图和无环图

  • 有环图 :包含一系列顶点连接的回路(环路),有环图是包含循环的,一系列顶点连接成一个环,在有环图中,如果不关心终止条件,算法可能永远在环上执行,无法退出。
    在这里插入图片描述

  • 无环图 :不包含循环,不能形成环,DAG即为有向无环图
    在这里插入图片描述

2.2.4 度

指一个顶点所有边的数量。

  • 出度:指从当前顶点指向其他顶点的边的数量
  • 入度:其他顶点指向当前顶点的边的数量
    在这里插入图片描述

2.3 图的经典表示法

邻接矩阵:
在这里插入图片描述
1、对于每条边,矩阵中相应单元格值为1
2、对于每个循环,矩阵中相应单元格值为2,方便在行或列上求得顶点度数

3、Spark GraphX

3.1 简介

GraphX是Spark提供分布式图计算API

GraphX特点:

复制代码
* 基于内存实现了数据的复用与快速读取
* 通过弹性分布式属性图(Property Graph)统一了图视图与表视图
* 与Spark Streaming、Spark SQL和Spark MLlib等无缝衔接

针对某些领域,如社交网络、语言建模等,graph-parallel系统可以高效地执行复杂的图形算法,比一般的data-parallel系统更快

Graphx是将graph-parallel的data-parallel统一到一个系统中。允许用户将数据当成一个图或一个集合RDD,而简化数据移动或复杂操作。

3.2 GraphX核心抽象

弹性分布式属性图(Resilient Distributed Property Graph)

复制代码
* 顶点和边都带属性的有向多重图

在这里插入图片描述
在这里插入图片描述

  • 一份物理存储,两种视图
    在这里插入图片描述
    对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。

3.3 GraphX API

  • Graph[VD,ED]
  • VertexRDD[VD]
  • EdgeRDD[ED]
  • EdgeTriplet[VD,ED]
  • Edge:样例类
  • VertexId:Long的别名

maven工程需要下载依赖:

复制代码
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-graphx_2.11</artifactId>
    <version>2.1.1</version>
    </dependency>
    
    
      
      
      
      
      
    
复制代码
    import org.apache.spark.graphx.{Edge, Graph}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object GraphxDemo1 {
      def main(args: Array[String]): Unit = {
    val conf :SparkConf= new SparkConf().setAppName("graphxDemo1").setMaster("local[2]")
    val sc = SparkContext.getOrCreate(conf)
    
    //创建vertices顶点rdd,(1L,1)中的1L代表顶点,1代表该顶点的属性
    val vd:RDD[(Long,Int)]=sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))
    
    //创建edges边rdd,(Edge(1L,2L,1)中的1L和2L代表顶点,1代表两顶点间的关系
    val ed:RDD[Edge[Int]]=sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))
    
    //创建graph对象
    val graph=Graph(vd,ed)
    
    //获取graph图对象的顶点信息
    graph.vertices.collect.foreach(println)
    graph.vertices.foreach(x=>println(s"${x._1}-->${x._2}"))
    //获取graph图对象的边信息
    graph.edges.collect.foreach(println)
    graph.edges.foreach(x=>println(s"src:${x.srcId},dst:${x.dstId},attr:${x.attr}"))
    //获取顶点和边的整体信息
    graph.triplets.collect.foreach(println)
      }
    }
    /*
    (2,2)
    (1,1)
    (3,3)
    1-->1
    3-->3
    2-->2
    Edge(1,2,1)
    Edge(2,3,2)
    src:1,dst:2,attr:1
    src:2,dst:3,attr:2
    ((1,1),(2,2),1)
    ((2,2),(3,3),2)
    */
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

关于vertices、edges、triplets所表示的含义如下图:
在这里插入图片描述

Spark shell需要导入Spark Graph包

复制代码
    //导入Spark Graph包
    scala> import org.apache.spark.graphx._
    
    //通过文件加载
    followers.txt内容是
    2 3
    1 4
    3 2
    4 3
    
    scala> val graphLoad=GraphLoader.edgeListFile(sc,"file:///root/test/followers.txt")
    graphLoad: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@6d0c8cd0
    
    scala> graphLoad.vertices.collect
    res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,1), (3,1), (2,1))
    
    scala> graphLoad.edges.collect
    res7: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1,4,1), Edge(2,3,1), Edge(3,2,1), Edge(4,3,1))
    
    scala> graphLoad.triplets.collect
    res8: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((1,1),(4,1),1), ((2,1),(3,1),1), ((3,1),(2,1),1), ((4,1),(3,1),1))
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

属性图应用示例:构建用户合作关系属性图

  • 顶点属性:用户名、职业
  • 边属性:合作关系
    在这里插入图片描述
复制代码
    import org.apache.spark.graphx.{Edge, Graph}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object GraphDemo2 {
      def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[1]").setAppName("graphDemo2")
    
    val sc = SparkContext.getOrCreate(conf)
    
    val user:RDD[(Long,(String,String))] = sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","professor")),(2L,("istoica","professor"))))
    
    val relationship:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"Collaborator"),Edge(5L,3L,"Advisor"),Edge(2L,5L,"Colleague"),Edge(5L,7L,"PI")))
    
    val graphUser=Graph(user,relationship)
    
    graphUser.vertices.collect.foreach(println)
    graphUser.edges.collect.foreach(println)
    graphUser.triplets.collect.foreach(println)
      }
    }
    /*
    (3,(rxin,student))
    (7,(jgonzal,postdoc))
    (5,(franklin,professor))
    (2,(istoica,professor))
    Edge(2,5,Colleague)
    Edge(3,7,Collaborator)
    Edge(5,3,Advisor)
    Edge(5,7,PI)
    ((2,(istoica,professor)),(5,(franklin,professor)),Colleague)
    ((3,(rxin,student)),(7,(jgonzal,postdoc)),Collaborator)
    ((5,(franklin,professor)),(3,(rxin,student)),Advisor)
    ((5,(franklin,professor)),(7,(jgonzal,postdoc)),PI)
    */
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

属性图应用示例:

  • 构建用户社交网络关系

    • 顶点:用户名、年龄
    • 边:打call次数
  • 找出大于30岁的用户

  • 假设打call超过5次,表示真爱。请找出他(她)们
    在这里插入图片描述

复制代码
    import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object GraphDemo3 {
      def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[1]").setAppName("graphDemo3")
    
    val sc = SparkContext.getOrCreate(conf)
    //构建用户社交网络关系
    val user:RDD[(Long,(String,Int))]  = sc.parallelize(
      Array(
        (1L, ("Alice", 28)),
        (2L, ("Bob", 27)),
        (3L, ("Charlie", 65)),
        (4L, ("David", 42)),
        (5L, ("Ed", 55)),
        (6L, ("Fran", 50))
      ))
    
    val userCall:RDD[Edge[Int]] = sc.parallelize(
      Array(
        Edge(2L, 1L, 7),
        Edge(4L, 1L, 1),
        Edge(5L, 2L, 2),
        Edge(3L, 2L, 4),
        Edge(5L, 3L, 8),
        Edge(2L, 4L, 2),
        Edge(5L, 6L, 3),
        Edge(3L, 6L, 7)
      ))
    
    val graphUserCall=Graph(user,userCall)
    
    //找出大于30岁的用户
    //方法1:利用元组,(1L, ("Alice", 28))是二维元组,
    graphUserCall.vertices.filter(x=>x._2._2>30).collect().foreach(println)
    //方法2:利用样例类
    graphUserCall.vertices.filter{case(id,(name,age))=>age>30}.collect().foreach(println)
    
    //假设打call超过5次,表示真爱。请找出,格式:*** like ***,stage:*
    
    // graphUserCall.triplets.collect().foreach(println)的输出格式是((2,(Bob,27)),(1,(Alice,28)),7)
    //看着像三元组,实际并不是,它的类型是RDD[EdgeTriplet[(String,Int),Int]]
    //val triplets:RDD[EdgeTriplet[(String,Int),Int]] = graphUserCall.triplets
    //所以千万不能用三元组处理!
    
    //可以使用算子srcAttr,dstAttr,attr
    //((2,(Bob,27)),(1,(Alice,28)),7)中,
    // srcAttr代表(Bob,27),dstAttr代表(Alice,28),attr代表7
    //而(Bob,27),(Alice,28)是二元组,这样就好解决了。
    graphUserCall.triplets.filter(x=>x.attr>5).collect()
      .foreach(x=>println(x.srcAttr._1+" like "+x.dstAttr._1+",stage:"+x.attr))
      }
    }
    /*输出:
    (4,(David,42))
    (6,(Fran,50))
    (3,(Charlie,65))
    (5,(Ed,55))
    Bob like Alice,stage:7
    Charlie like Fran,stage:7
    Ed like Charlie,stage:8
    */
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

查看图信息

  • 顶点数量:val numVertices: Long
  • 边数量:val numEdges: Long
  • 度:val degrees: VertexRDD[Int]
    • 入度:val inDegrees: VertexRDD[Int]
    • 出度:val outDegrees: VertexRDD[Int]
复制代码
    //使用上述的示例
     println(graphUserCall.numVertices)  	//输出:6
     println(graphUserCall.numEdges)		//输出:8
     graphUserCall.degrees.collect().foreach(println) 	//输出:(4,2)(1,2)(6,2)(3,3)(5,3)(2,4)
     graphUserCall.inDegrees.collect().foreach(println)	//输出:(4,1)(1,2)(6,2)(3,1)(2,2)
     graphUserCall.outDegrees.collect().foreach(println)//输出:(4,1)(3,2)(5,3)(2,2)
    
    
      
      
      
      
      
      
    

3.4 图的算子

3.4.1 属性算子(数据变换)

用于属性图属性数据变换

  • 类似于RDD的map操作
    • mapVertices–>修改顶点的属性
    • mapEdges–>修改边的关系(也只是改变值,不改变结构)
    • mapTriplets
复制代码
    class Graph[VD, ED] {
      def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
      def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
      def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
    }
    
    
      
      
      
      
      
    

使用上述示例:用户社交网络关系(打call),这次用spark-shell进行操作。

复制代码
    //导包
    scala> import org.apache.spark.graphx._
    
    //创建顶点RDD
    scala> val user=sc.parallelize(Array(
     | (1L,("Alice",28)),
     | (2L,("Bob",27)),
     | (3L,("Charlie",65)),
     | (4L,("David",42)),
     | (5L,("Ed",55)),
     | (6L,("Age",50))))
    
    //创建边RDD
    scala> val userCall=sc.parallelize(
     | Array(
     | Edge(4L,1L,1),
     | Edge(2L,1L,7),
     | Edge(5L,2L,2),
     | Edge(3L,2L,4),
     | Edge(5L,3L,8),
     | Edge(2L,4L,2),
     | Edge(5L,6L,3),
     | Edge(3L,6L,3)))
    
    
    //创建graph对象
    scala> val graphUserCall=Graph(user,userCall)
    
    
    scala> graphUserCall.vertices.collect.foreach(println)
    (4,(David,42))
    (1,(Alice,28))
    (6,(Age,50))
    (3,(Charlie,65))
    (5,(Ed,55))
    (2,(Bob,27))
    
    //获取graph图对象的顶点信息
    scala> graphUserCall.vertices.collect.foreach(println)
    (4,(David,42))
    (1,(Alice,28))
    (6,(Age,50))
    (3,(Charlie,65))
    (5,(Ed,55))
    (2,(Bob,27))
    
    //现需要改变顶点属性
    //方法1:使用模式匹配
    scala> val t1_graph=graphUserCall.mapVertices{case(vertextId,(name,age))=>(vertextId,name)}
    
    scala> t1_graph.vertices.collect.foreach(println)
    (4,(4,David))
    (1,(1,Alice))
    (6,(6,Age))
    (3,(3,Charlie))
    (5,(5,Ed))
    (2,(2,Bob))
    
    //方法2:使用元组
    scala> val t2_graph=graphUserCall.mapVertices((id,attr)=>(id,attr._1))
    
    scala> t2_graph.vertices.collect.foreach(println)
    (4,(4,David))
    (1,(1,Alice))
    (6,(6,Age))
    (3,(3,Charlie))
    (5,(5,Ed))
    (2,(2,Bob))
    
    //获取graph图对象的边信息
    scala> graphUserCall.edges.collect.foreach(println)
    Edge(2,1,7)
    Edge(2,4,2)
    Edge(3,2,4)
    Edge(3,6,3)
    Edge(4,1,1)
    Edge(5,2,2)
    Edge(5,3,8)
    Edge(5,6,3)
    
    //需求:将打call次数乘以7
    scala> val t3_graph=graphUserCall.mapEdges(x=>Edge(x.srcId,x.dstId,x.attr*7.0))
       
    scala> t3_graph.edges.collect.foreach(println)
    Edge(2,1,Edge(2,1,49.0))
    Edge(2,4,Edge(2,4,14.0))
    Edge(3,2,Edge(3,2,28.0))
    Edge(3,6,Edge(3,6,21.0))
    Edge(4,1,Edge(4,1,7.0))
    Edge(5,2,Edge(5,2,14.0))
    Edge(5,3,Edge(5,3,56.0))
    Edge(5,6,Edge(5,6,21.0)) 
    //显然这样不是我们想要的,mapEdges(x=>...)里的x是指的是打call次数,而不是Edge(2,1,7)整体
    
    scala> val t3_graph=graphUserCall.mapEdges(x=>x.attr*7.0)
    
    scala> t3_graph.edges.collect.foreach(println)
    Edge(2,1,49.0)
    Edge(2,4,14.0)
    Edge(3,2,28.0)
    Edge(3,6,21.0)
    Edge(4,1,7.0)
    Edge(5,2,14.0)
    Edge(5,3,56.0)
    Edge(5,6,21.0)
    
    //需求:打call次数加上出度顶点的年龄,使用mapTriplets
    scala> val t4_graphUserCall=graphUserCall.mapTriplets(x=>(x.attr+x.srcAttr._2))
    
    scala> t4_graphUserCall.triplets.collect.foreach(println)
    ((2,(Bob,27)),(1,(Alice,28)),34)
    ((2,(Bob,27)),(4,(David,42)),29)
    ((3,(Charlie,65)),(2,(Bob,27)),69)
    ((3,(Charlie,65)),(6,(Age,50)),68)
    ((4,(David,42)),(1,(Alice,28)),43)
    ((5,(Ed,55)),(2,(Bob,27)),57)
    ((5,(Ed,55)),(3,(Charlie,65)),63)
    ((5,(Ed,55)),(6,(Age,50)),58)
    
    /*总结一下:
    1、mapVertices,修改的是顶点的属性VD
    2、mapEdges,修改的是边的关系ED
    3、mapTriplets,修改的是边的关系ED
    */
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

注意:

  • 这里每一个操作产生一个新图,其顶点和边被用户定义的map函数修改了。
  • 在每一个实例图结构不受影响,仅仅是属性图属性数据变换。

3.4.2 结构算子(结构变换)

属性图结构变换

  • reverse:改变边的方向
  • subgraph:生成满足顶点和边的条件的子图
复制代码
    class Graph[VD, ED] {
      def reverse: Graph[VD, ED]
      def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
      }
    
    
      
      
      
      
      
    

此处注意epred和vpred的区别 ,下面演示示例详细说明。

复制代码
    scala> graphUserCall.triplets.collect.foreach(println)
    ((2,(Bob,27)),(1,(Alice,28)),7)
    ((2,(Bob,27)),(4,(David,42)),2)
    ((3,(Charlie,65)),(2,(Bob,27)),4)
    ((3,(Charlie,65)),(6,(Age,50)),3)
    ((4,(David,42)),(1,(Alice,28)),1)
    ((5,(Ed,55)),(2,(Bob,27)),2)
    ((5,(Ed,55)),(3,(Charlie,65)),8)
    ((5,(Ed,55)),(6,(Age,50)),3)
    //reverse 改变边的方向
    scala> val reverse_graphUserCall=graphUserCall.reverse
    
    scala> reverse_graphUserCall.triplets.collect.foreach(println)
    ((1,(Alice,28)),(2,(Bob,27)),7)
    ((1,(Alice,28)),(4,(David,42)),1)
    ((2,(Bob,27)),(3,(Charlie,65)),4)
    ((2,(Bob,27)),(5,(Ed,55)),2)
    ((3,(Charlie,65)),(5,(Ed,55)),8)
    ((4,(David,42)),(2,(Bob,27)),2)
    ((6,(Age,50)),(3,(Charlie,65)),3)
    ((6,(Age,50)),(5,(Ed,55)),3)
    
    //subgraph之vpred
    //vpred: (VertexId, VD) => Boolean  
    //注意匿名函数的参数是顶点信息,也就意味着会每个顶点进行判断
    //需求:截取年龄小于65的子图(包括所有顶点)
    scala> graphUserCall.subgraph(vpred=(id,attr)=>attr._2<65).triplets.collect.foreach(println)
    ((2,(Bob,27)),(1,(Alice,28)),7)
    ((2,(Bob,27)),(4,(David,42)),2)
    ((4,(David,42)),(1,(Alice,28)),1)
    ((5,(Ed,55)),(2,(Bob,27)),2)
    ((5,(Ed,55)),(6,(Age,50)),3)
    
    //subgraph之epred
    //epred: EdgeTriplet[VD,ED] => Boolean  
    //注意匿名函数的参数是顶点和边的整体信息,如果需要过滤顶点信息,需要使用srcAttr和dstAttr
    //需求:截取年龄小于65的子图(仅过滤出度顶点,也就是起始顶点)
    scala> graphUserCall.subgraph(epred=(ep)=>ep.srcAttr._2<65).triplets.collect.foreach(println)
    ((2,(Bob,27)),(1,(Alice,28)),7)
    ((2,(Bob,27)),(4,(David,42)),2)
    ((4,(David,42)),(1,(Alice,28)),1)
    ((5,(Ed,55)),(2,(Bob,27)),2)
    ((5,(Ed,55)),(3,(Charlie,65)),8) //比vpred多了此条
    ((5,(Ed,55)),(6,(Age,50)),3)
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

3.4.3 join算子

从外部的RDDs加载数据,修改顶点属性。

  • joinVertices
  • outerJoinVertices
复制代码
    class Graph[VD, ED] {
      def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
      def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
    }
    
    
      
      
      
      
      
    
复制代码
    //仍使用打call的示例
    //新创建一个RDD
    scala> val two=sc.makeRDD(Array((1L,"kgc.cn"),(2L,"qq.com"),(3L,"163.com"),(7L,"so.com")))
    
    //joinVertices
    //用法:graph.joinVertices(RDD)
    scala> graphUserCall.joinVertices(two)((id,v,cmpy)=>(v._1+"@"+cmpy,v._2)).vertices.collect.foreach(println)
    (4,(David,42))
    (1,(Alice@kgc.cn,28))
    (6,(Age,50))
    (3,(Charlie@163.com,65))
    (5,(Ed,55))
    (2,(Bob@qq.com,27))
    
    //outerJoinVertices
    //用法:graph.outerJoinVertices(RDD)
    scala> graphUserCall.outerJoinVertices(two)((id,v,cmpy)=>(v._1+"@"+cmpy,v._2)).vertices.collect.foreach(println)
    (4,(David@None,42))   //RDD中的顶点不匹配时,值为None
    (1,(Alice@Some(kgc.cn),28))
    (6,(Age@None,50))
    (3,(Charlie@Some(163.com),65))
    (5,(Ed@None,55))
    (2,(Bob@Some(qq.com),27))
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

属性算子mapVertices可以修改顶点的属性,join算子也可以修改顶点的属性,区别是join算子是从外部的RDDs加载数据。

需求:计算用户粉丝数量

复制代码
    scala> case class User(name:String,age:Int,inDeg:Int,outDeg:Int)
    
    //将顶点入度、出度存入顶点属性中
    scala> val t2_graph=graphUserCall.outerJoinVertices(graphUserCall.inDegrees){case(id,u,indeg)=>User(u._1,u._2,indeg.getOrElse(0),0)}.outerJoinVertices(graphUserCall.outDegrees){case(id,u,outdeg)=>User(u.name,u.age,u.inDeg,outdeg.getOrElse(0))}
    
    scala> t2_graph.vertices.collect.foreach(println)
    (4,User(David,42,1,1))
    (1,User(Alice,28,2,0))
    (6,User(Age,50,2,0))
    (3,User(Charlie,65,1,2))
    (5,User(Ed,55,0,3))
    (2,User(Bob,27,2,2))
    
    
      
      
      
      
      
      
      
      
      
      
      
      
    

3.4.4 练习:找出用户粉丝数量

需求:现有数据格式如下,创建图并计算每个用户的粉丝数量。
格式:((User*, ),(User ,*))

  • (User*, *)=(用户名,用户ID)
  • 第一个用户表示被跟随者(followee)
  • 第二个用户表示跟随者(follower)
    在这里插入图片描述
复制代码
    import org.apache.spark.graphx.{Edge, Graph}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.util.matching.Regex
    
    object Test01 {
      def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getName)
      .master("local[1]")
      .getOrCreate()
    // 创建SparkContext
    val sc: SparkContext = spark.sparkContext
    // 编写正则表达式, 用于提取字段
    val pattern: Regex =
      """\(\((User\d+,\d+)\),\((User\d+,\d+)\)\)""".r
    // 读取数据文件
    val twitters: RDD[(Array[String], Array[String])] = sc.textFile("D:\ 上课PPT\ 直播课资料\ 2-Spark阶段\ 02-spark\ twitter_graph_data.txt")
      // 正则表达式的模式匹配, 返回一个Option数据类型
      .map(line => line match {
      case pattern(followee, follower) => (Some(followee), Some(follower))
      case _ => (None, None)
      // 过滤掉值为None的
    }).filter(x => x._1 != None && x._2 != None)
      // 正则表达式模式匹配后返回两组值, 分别对这两组值进行切割处理, 返回一个(Array[String], Array[String])
      .map(x => (x._1.get.split(","), x._2.get.split(",")))
    // 使用flatMap对数据进行扁平化处理, 构建点集合
    // 也可以使用union对两个RDD求并集, 这里不做演示
    val verts: RDD[(Long, String)] = twitters.flatMap(x => Array((x._1(1).toLong, x._1(0)), (x._2(1).toLong, x._2(0)))).distinct()
    // 构建边集合
    val edges: RDD[Edge[String]] = twitters.map(x => Edge(x._2(1).toLong, x._1(1).toLong, "follow"))
    // 构建图
    // 有可能会出现一种情况, 在边集合中出现的点在点集合中不存在
    val graph = Graph(verts,edges,"")
    // 求入度, 按照入度的值进行倒序排列
    // 注意: 要想实现全局有序, 需要重新分成一个区
    graph.inDegrees.repartition(1).sortBy(x=>x._2,false).foreach(println)
      }
    }
    /*输出结果:
    (123004655,56)
    (36851222,56)
    (59804598,54)
    (63644892,46)
    (14444530,42)
    ......
    ./
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

4、常用图算法

4.1 页面排名算法–PageRank

  • 用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
  • 从本质上讲,PageRank是找出图中顶点(网页链接)的重要性
  • GraphX提供了PageRank API用于计算图的PageRank

原理部分详见:PageRank算法原理剖析及Spark实现

复制代码
    class Graph[VD, ED] {
      def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
    }
    //tol:收敛时允许的误差,越小越精确, 确定迭代是否结束的参数
    //resetProb:随机重置概率
    //返回的仍是Graph,格式:(1,1.5453030618621122),本质上是找出顶点的重要性。
    
    
      
      
      
      
      
      
    
复制代码
    //使用不同的收敛误差,结果进行对比
    scala> graphUserCall.pageRank(0.1).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
    (1,1.5453030618621122)
    (4,1.035409289731306)
    (6,1.0247865028119143)
    (2,1.0247865028119143)
    (3,0.7698396167465112)
    (5,0.5998750260362425)
    
    scala> graphUserCall.pageRank(0.01).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
    (1,1.7757164399923602)
    (6,1.0009207604397985)
    (2,1.0009207604397985)
    (4,0.9727164143364966)
    (3,0.7024005336419639)
    (5,0.5473250911495823)
    
    scala> graphUserCall.pageRank(0.001).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
    (1,1.7924127957615186)
    (6,0.9969646507526428)
    (2,0.9969646507526428)
    (4,0.9688717814927128)
    (3,0.6996243163176442)
    (5,0.5451618049228396)
    
    scala> graphUserCall.pageRank(0.0001).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
    (1,1.7924127957615186)
    (6,0.9969646507526428)
    (2,0.9969646507526428)
    (4,0.9688717814927128)
    (3,0.6996243163176442)
    (5,0.5451618049228396)
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

4.2 连通分量–Connected Component

  • 连通分量是一个子图,其中任何两个顶点通过一条边或一系列边相互连接,其顶点是原始图顶点集的子集,其边是原始图边集的子集
复制代码
    class Graph[VD, ED] {
      def connectedComponents(): Graph[VertexID, ED]
    }
    
    
      
      
      
    

示例参考博客:GraphX之Connected Components

首先准备数据源
links.csv

复制代码
    1,2,friend
    1,3,sister
    2,4,brother
    3,2,boss
    4,5,client
    1,9,friend
    6,7,cousin
    7,9,coworker
    8,9,father
    10,11,colleague
    10,12,colleague
    11,12,colleague
    
    
      
      
      
      
      
      
      
      
      
      
      
      
    

people.csv

复制代码
    4,Dave,25
    6,Faith,21
    8,Harvey,47
    2,Bob,18
    1,Alice,20
    3,Charlie,30
    7,George,34
    9,Ivy,21
    5,Eve,30
    10,Lily,35
    11,Helen,35
    12,Ann,35
    
    
      
      
      
      
      
      
      
      
      
      
      
      
    

结构图:
在这里插入图片描述

复制代码
    scala> import org.apache.spark.graphx._
    
    scala> case class Person(name:String,age:Int)
    
    scala> val people =sc.textFile("file:///root/test/people.csv")
    
    scala> val peopleRDD=people.map(line=>line.split(',')).map(x=>(x(0).toLong,Person(x(1),x(2).toInt)))
    
    scala> val links=sc.textFile("file:///root/test/links.csv")
    
    scala> val linkRDD=links.map(x=>{val row =x.split(',');Edge(row(0).toLong,row(1).toLong,row(2))})
    
    scala> val graph=Graph(peopleRDD,linkRDD)
    //使用connectedComponents
    scala> val cc=graph.connectedComponents
    
    scala> cc.vertices.collect.foreach(println)
    (4,1)
    (11,10)
    (1,1)
    (6,1)
    (8,1)
    (3,1)
    (9,1)
    (7,1)
    (12,10)
    (10,10)
    (5,1)
    (2,1)
    //从结果中可以看到通过计算之后的图,每个顶点多了一个属性,这个属性表示的就是这个顶点所在的连通图中的最小顶点id。
    //例如顶点11所在的连通图中的最小顶点id是10,顶点4所在的连通图中的最小顶点id是1。
    //显然cc的格式还不是我们想要的。
    //经过connectedComponents得到的结果,可以知道哪些顶点在一个连通图中,这样就可以将一个大图拆分成若干个连通子图。
    
    scala>val newGraph= cc.outerJoinVertices(peopleRDD)((id,mincc,people)=>(mincc,people.get.name,people.get.age))
    
    scala> newGraph.vertices.collect.foreach(println)
    (4,(1,Dave,25))
    (11,(10,Helen,35))
    (1,(1,Alice,20))
    (6,(1,Faith,21))
    (8,(1,Harvey,47))
    (3,(1,Charlie,30))
    (9,(1,Ivy,21))
    (7,(1,George,34))
    (12,(10,Ann,35))
    (10,(10,Lily,35))
    (5,(1,Eve,30))
    (2,(1,Bob,18))
    
    
    scala> cc.vertices.map(_._2).collect.distinct.foreach(mincc=>{val sub=newGraph.subgraph(vpred=(id,attr)=> attr._1==mincc);println(sub.triplets.collect.foreach(println))})
    ((1,(1,Alice,20)),(2,(1,Bob,18)),friend)
    ((1,(1,Alice,20)),(3,(1,Charlie,30)),sister)
    ((1,(1,Alice,20)),(9,(1,Ivy,21)),friend)
    ((2,(1,Bob,18)),(4,(1,Dave,25)),brother)
    ((3,(1,Charlie,30)),(2,(1,Bob,18)),boss)
    ((4,(1,Dave,25)),(5,(1,Eve,30)),client)
    ((6,(1,Faith,21)),(7,(1,George,34)),cousin)
    ((7,(1,George,34)),(9,(1,Ivy,21)),coworker)
    ((8,(1,Harvey,47)),(9,(1,Ivy,21)),father)
    ()
    ((10,(10,Lily,35)),(11,(10,Helen,35)),colleague)
    ((10,(10,Lily,35)),(12,(10,Ann,35)),colleague)
    ((11,(10,Helen,35)),(12,(10,Ann,35)),colleague)
    ()
    /*分析:
    1、通过connectedComponents得到的新图的顶点属性已经没有了原始的那些信息,所以需要和原始信息作一个join,例如val newGraph = cc.outerJoinVertices(peopleRDD)((id, cc, p)=>(cc,p.get.name,p.get.age))
    2、cc.vertices.map(_._2).collect.distinct会得到所有连通图中id最小的顶点编号
    3、通过连通图中最小顶点编号,使用subgraph方法得到每个连通子图
    */
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

总结一下实现思路:

  • 1、已有图graph,通过connectedComponents可以得到每个顶点所在的连通图中的最小顶点id,格式:(某顶点id,该顶点连通图最小顶点id),也是一个graph,取名为cc。
  • 2、将这个最小id加入到对应顶点的属性中,可以使用outerJoinVertices,得到一个新的graph。
  • 3、可以使用subgraph取出子图。

5、Pregel计算框架

可用于导航路线的优选。

复制代码
    class Graph[VD, ED] {  
    def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
    }
    
    
      
      
      
      
      
      
      
    
  • initialMsg:在“superstep 0”之前发送至顶点的初始消息
    • maxIterations:将要执行的最大迭代次数
    • activeDirection:发送消息方向(默认是出边方向:EdgeDirection.Out)
    • vprog:用户定义函数,用于顶点接收消息
    • sendMsg:用户定义的函数,用于确定下一个迭代发送的消息及发往何处
    • mergeMsg:用户定义的函数,在vprog前,合并到达顶点的多个消息

preple原理分析:<>

复制代码
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object PregelDemo {
      def main(args: Array[String]): Unit = {
    //1、创建SparkContext
    val conf = new SparkConf().setMaster("local[1]").setAppName("PregelDemo")
    val sc = SparkContext.getOrCreate(conf)
    
    //2、创建顶点
    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )
    val vertexRDD: RDD[(VertexId, (String,Int))] = sc.makeRDD(vertexArray)
    
    //3、创建边,边的属性代表 相邻两个顶点之间的距离
    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(2L, 5L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )
    val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)
    //4、创建图(使用apply方式创建)
    val graph = Graph(vertexRDD, edgeRDD)
    
    //被计算的图中,起始顶点的id
    val srcVertexId=5L
    val initialGraph:Graph[Double,Int]=graph.mapVertices{case (vid,(name,age))=>if(vid==srcVertexId) 0.0 else Double.PositiveInfinity}
    
    //initialGraph.vertices.collect().foreach(println)
    
    val pregelGraph:Graph[Double,Int]=initialGraph.pregel(
      Double.PositiveInfinity,
      Int.MaxValue,
      EdgeDirection.Out
    )(
      (vid:VertexId,vd:Double,disMsg:Double)=>{
        val minDist:Double=math.min(vd,disMsg)
        println("vprog"+vid+" "+vd+" "+disMsg+" "+minDist)
        minDist
      },
      (edgeTriplet:EdgeTriplet[Double,PartitionID])=>{
        if (edgeTriplet.srcAttr+edgeTriplet.attr<edgeTriplet.dstAttr){
          println("sendMsg:"+edgeTriplet.srcId+" "+edgeTriplet.srcAttr+" "+edgeTriplet.dstId+" "+edgeTriplet.dstAttr)
          Iterator[(VertexId,Double)]((
            edgeTriplet.dstId,edgeTriplet.srcAttr+edgeTriplet.attr
          ))
        }else{
          Iterator.empty
        }
      },
      (msg1:Double,msg2:Double)=>{
        math.min(msg1,msg2)
      }
    )
    pregelGraph.vertices.collect().foreach(println)
      }
    }
    /*输出:
    vprog4 Infinity Infinity Infinity
    vprog1 Infinity Infinity Infinity
    vprog6 Infinity Infinity Infinity
    vprog3 Infinity Infinity Infinity
    vprog5 0.0 Infinity 0.0
    vprog2 Infinity Infinity Infinity
    sendMsg:5 0.0 3 Infinity
    sendMsg:5 0.0 6 Infinity
    vprog6 Infinity 3.0 3.0
    vprog3 Infinity 8.0 8.0
    sendMsg:3 8.0 2 Infinity
    vprog2 Infinity 12.0 12.0
    sendMsg:2 12.0 1 Infinity
    sendMsg:2 12.0 4 Infinity
    vprog4 Infinity 14.0 14.0
    vprog1 Infinity 19.0 19.0
    sendMsg:4 14.0 1 19.0
    vprog1 19.0 15.0 15.0
    (4,14.0)
    (1,15.0)
    (6,3.0)
    (3,8.0)
    (5,0.0)
    (2,12.0)
    */
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

全部评论 (0)

还没有任何评论哟~