基于Spark GraphX的图形数据分析
目录
-
1、为什么需要图计算
-
2、图的概念
-
-
2.1 图的基本概念及应用场景
-
2.2 图的术语
-
- 2.2.1 顶点(Vertex)和边(Edge)
- 2.2.2 有向图和无向图
- 2.2.3 有环图和无环图
- 2.2.4 度
-
2.3 图的经典表示法
-
-
3、Spark GraphX
-
- 3.1 简介
- 3.2 GraphX核心抽象
- 3.3 GraphX API
- 3.4 图的算子
-
- 3.4.1 属性算子(数据变换)
- 3.4.2 结构算子(结构变换)
- 3.4.3 join算子
- 3.4.4 练习:找出用户粉丝数量
-
4、常用图算法
-
- 4.1 页面排名算法--PageRank
- 4.2 连通分量--Connected Component
-
5、Pregel计算框架
1、为什么需要图计算
- 许多大数据以大规模图或网络的形式呈现
- 许多非图结构的大数据,常会被转换为图模型进行分析
- 图数据结构很好地表达了数据之间的关联性
2、图的概念
2.1 图的基本概念及应用场景
图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构
- 通常表示为二元组:Gragh=(V,E)
- 可以对事物之间的关系建模
应用场景
- 在地图应用中寻找最短路径
- 社交网络关系
- 网页间超链接关系
2.2 图的术语
2.2.1 顶点(Vertex)和边(Edge)
一般关系图中,事物为顶点,关系为边
定义一个图:
Graph=(V,E)
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}

2.2.2 有向图和无向图
- 有向图 :在有向图中,一条边的两个顶点一般扮演者不同的角色,比如父子关系、页面A连接向页面B;
G=(V,E)
V={A,B,C,D,E}
E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>} //关系用尖括号表示

- 无向图 :在一个无向图中,边没有方向,即关系都是对等的,比如qq中的好友。
G=(V,E)
V={A,B,C,D,E}
E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}

2.2.3 有环图和无环图
-
有环图 :包含一系列顶点连接的回路(环路),有环图是包含循环的,一系列顶点连接成一个环,在有环图中,如果不关心终止条件,算法可能永远在环上执行,无法退出。

-
无环图 :不包含循环,不能形成环,DAG即为有向无环图

2.2.4 度
指一个顶点所有边的数量。
- 出度:指从当前顶点指向其他顶点的边的数量
- 入度:其他顶点指向当前顶点的边的数量

2.3 图的经典表示法
邻接矩阵:

1、对于每条边,矩阵中相应单元格值为1
2、对于每个循环,矩阵中相应单元格值为2,方便在行或列上求得顶点度数
3、Spark GraphX
3.1 简介
GraphX是Spark提供分布式图计算API
GraphX特点:
* 基于内存实现了数据的复用与快速读取
* 通过弹性分布式属性图(Property Graph)统一了图视图与表视图
* 与Spark Streaming、Spark SQL和Spark MLlib等无缝衔接
针对某些领域,如社交网络、语言建模等,graph-parallel系统可以高效地执行复杂的图形算法,比一般的data-parallel系统更快
Graphx是将graph-parallel的data-parallel统一到一个系统中。允许用户将数据当成一个图或一个集合RDD,而简化数据移动或复杂操作。
3.2 GraphX核心抽象
弹性分布式属性图(Resilient Distributed Property Graph)
* 顶点和边都带属性的有向多重图


- 一份物理存储,两种视图

对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。
3.3 GraphX API
- Graph[VD,ED]
- VertexRDD[VD]
- EdgeRDD[ED]
- EdgeTriplet[VD,ED]
- Edge:样例类
- VertexId:Long的别名
maven工程需要下载依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.1.1</version>
</dependency>
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphxDemo1 {
def main(args: Array[String]): Unit = {
val conf :SparkConf= new SparkConf().setAppName("graphxDemo1").setMaster("local[2]")
val sc = SparkContext.getOrCreate(conf)
//创建vertices顶点rdd,(1L,1)中的1L代表顶点,1代表该顶点的属性
val vd:RDD[(Long,Int)]=sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))
//创建edges边rdd,(Edge(1L,2L,1)中的1L和2L代表顶点,1代表两顶点间的关系
val ed:RDD[Edge[Int]]=sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))
//创建graph对象
val graph=Graph(vd,ed)
//获取graph图对象的顶点信息
graph.vertices.collect.foreach(println)
graph.vertices.foreach(x=>println(s"${x._1}-->${x._2}"))
//获取graph图对象的边信息
graph.edges.collect.foreach(println)
graph.edges.foreach(x=>println(s"src:${x.srcId},dst:${x.dstId},attr:${x.attr}"))
//获取顶点和边的整体信息
graph.triplets.collect.foreach(println)
}
}
/*
(2,2)
(1,1)
(3,3)
1-->1
3-->3
2-->2
Edge(1,2,1)
Edge(2,3,2)
src:1,dst:2,attr:1
src:2,dst:3,attr:2
((1,1),(2,2),1)
((2,2),(3,3),2)
*/
关于vertices、edges、triplets所表示的含义如下图:

Spark shell需要导入Spark Graph包
//导入Spark Graph包
scala> import org.apache.spark.graphx._
//通过文件加载
followers.txt内容是
2 3
1 4
3 2
4 3
scala> val graphLoad=GraphLoader.edgeListFile(sc,"file:///root/test/followers.txt")
graphLoad: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@6d0c8cd0
scala> graphLoad.vertices.collect
res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,1), (3,1), (2,1))
scala> graphLoad.edges.collect
res7: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1,4,1), Edge(2,3,1), Edge(3,2,1), Edge(4,3,1))
scala> graphLoad.triplets.collect
res8: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((1,1),(4,1),1), ((2,1),(3,1),1), ((3,1),(2,1),1), ((4,1),(3,1),1))
属性图应用示例:构建用户合作关系属性图
- 顶点属性:用户名、职业
- 边属性:合作关系

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphDemo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[1]").setAppName("graphDemo2")
val sc = SparkContext.getOrCreate(conf)
val user:RDD[(Long,(String,String))] = sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","professor")),(2L,("istoica","professor"))))
val relationship:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"Collaborator"),Edge(5L,3L,"Advisor"),Edge(2L,5L,"Colleague"),Edge(5L,7L,"PI")))
val graphUser=Graph(user,relationship)
graphUser.vertices.collect.foreach(println)
graphUser.edges.collect.foreach(println)
graphUser.triplets.collect.foreach(println)
}
}
/*
(3,(rxin,student))
(7,(jgonzal,postdoc))
(5,(franklin,professor))
(2,(istoica,professor))
Edge(2,5,Colleague)
Edge(3,7,Collaborator)
Edge(5,3,Advisor)
Edge(5,7,PI)
((2,(istoica,professor)),(5,(franklin,professor)),Colleague)
((3,(rxin,student)),(7,(jgonzal,postdoc)),Collaborator)
((5,(franklin,professor)),(3,(rxin,student)),Advisor)
((5,(franklin,professor)),(7,(jgonzal,postdoc)),PI)
*/
属性图应用示例:
-
构建用户社交网络关系
- 顶点:用户名、年龄
- 边:打call次数
-
找出大于30岁的用户
-
假设打call超过5次,表示真爱。请找出他(她)们

import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphDemo3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[1]").setAppName("graphDemo3")
val sc = SparkContext.getOrCreate(conf)
//构建用户社交网络关系
val user:RDD[(Long,(String,Int))] = sc.parallelize(
Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
))
val userCall:RDD[Edge[Int]] = sc.parallelize(
Array(
Edge(2L, 1L, 7),
Edge(4L, 1L, 1),
Edge(5L, 2L, 2),
Edge(3L, 2L, 4),
Edge(5L, 3L, 8),
Edge(2L, 4L, 2),
Edge(5L, 6L, 3),
Edge(3L, 6L, 7)
))
val graphUserCall=Graph(user,userCall)
//找出大于30岁的用户
//方法1:利用元组,(1L, ("Alice", 28))是二维元组,
graphUserCall.vertices.filter(x=>x._2._2>30).collect().foreach(println)
//方法2:利用样例类
graphUserCall.vertices.filter{case(id,(name,age))=>age>30}.collect().foreach(println)
//假设打call超过5次,表示真爱。请找出,格式:*** like ***,stage:*
// graphUserCall.triplets.collect().foreach(println)的输出格式是((2,(Bob,27)),(1,(Alice,28)),7)
//看着像三元组,实际并不是,它的类型是RDD[EdgeTriplet[(String,Int),Int]]
//val triplets:RDD[EdgeTriplet[(String,Int),Int]] = graphUserCall.triplets
//所以千万不能用三元组处理!
//可以使用算子srcAttr,dstAttr,attr
//((2,(Bob,27)),(1,(Alice,28)),7)中,
// srcAttr代表(Bob,27),dstAttr代表(Alice,28),attr代表7
//而(Bob,27),(Alice,28)是二元组,这样就好解决了。
graphUserCall.triplets.filter(x=>x.attr>5).collect()
.foreach(x=>println(x.srcAttr._1+" like "+x.dstAttr._1+",stage:"+x.attr))
}
}
/*输出:
(4,(David,42))
(6,(Fran,50))
(3,(Charlie,65))
(5,(Ed,55))
Bob like Alice,stage:7
Charlie like Fran,stage:7
Ed like Charlie,stage:8
*/
查看图信息
- 顶点数量:
val numVertices: Long - 边数量:
val numEdges: Long - 度:
val degrees: VertexRDD[Int]- 入度:
val inDegrees: VertexRDD[Int] - 出度:
val outDegrees: VertexRDD[Int]
- 入度:
//使用上述的示例
println(graphUserCall.numVertices) //输出:6
println(graphUserCall.numEdges) //输出:8
graphUserCall.degrees.collect().foreach(println) //输出:(4,2)(1,2)(6,2)(3,3)(5,3)(2,4)
graphUserCall.inDegrees.collect().foreach(println) //输出:(4,1)(1,2)(6,2)(3,1)(2,2)
graphUserCall.outDegrees.collect().foreach(println)//输出:(4,1)(3,2)(5,3)(2,2)
3.4 图的算子
3.4.1 属性算子(数据变换)
用于属性图属性数据变换
- 类似于RDD的map操作
- mapVertices–>修改顶点的属性
- mapEdges–>修改边的关系(也只是改变值,不改变结构)
- mapTriplets
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
使用上述示例:用户社交网络关系(打call),这次用spark-shell进行操作。
//导包
scala> import org.apache.spark.graphx._
//创建顶点RDD
scala> val user=sc.parallelize(Array(
| (1L,("Alice",28)),
| (2L,("Bob",27)),
| (3L,("Charlie",65)),
| (4L,("David",42)),
| (5L,("Ed",55)),
| (6L,("Age",50))))
//创建边RDD
scala> val userCall=sc.parallelize(
| Array(
| Edge(4L,1L,1),
| Edge(2L,1L,7),
| Edge(5L,2L,2),
| Edge(3L,2L,4),
| Edge(5L,3L,8),
| Edge(2L,4L,2),
| Edge(5L,6L,3),
| Edge(3L,6L,3)))
//创建graph对象
scala> val graphUserCall=Graph(user,userCall)
scala> graphUserCall.vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice,28))
(6,(Age,50))
(3,(Charlie,65))
(5,(Ed,55))
(2,(Bob,27))
//获取graph图对象的顶点信息
scala> graphUserCall.vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice,28))
(6,(Age,50))
(3,(Charlie,65))
(5,(Ed,55))
(2,(Bob,27))
//现需要改变顶点属性
//方法1:使用模式匹配
scala> val t1_graph=graphUserCall.mapVertices{case(vertextId,(name,age))=>(vertextId,name)}
scala> t1_graph.vertices.collect.foreach(println)
(4,(4,David))
(1,(1,Alice))
(6,(6,Age))
(3,(3,Charlie))
(5,(5,Ed))
(2,(2,Bob))
//方法2:使用元组
scala> val t2_graph=graphUserCall.mapVertices((id,attr)=>(id,attr._1))
scala> t2_graph.vertices.collect.foreach(println)
(4,(4,David))
(1,(1,Alice))
(6,(6,Age))
(3,(3,Charlie))
(5,(5,Ed))
(2,(2,Bob))
//获取graph图对象的边信息
scala> graphUserCall.edges.collect.foreach(println)
Edge(2,1,7)
Edge(2,4,2)
Edge(3,2,4)
Edge(3,6,3)
Edge(4,1,1)
Edge(5,2,2)
Edge(5,3,8)
Edge(5,6,3)
//需求:将打call次数乘以7
scala> val t3_graph=graphUserCall.mapEdges(x=>Edge(x.srcId,x.dstId,x.attr*7.0))
scala> t3_graph.edges.collect.foreach(println)
Edge(2,1,Edge(2,1,49.0))
Edge(2,4,Edge(2,4,14.0))
Edge(3,2,Edge(3,2,28.0))
Edge(3,6,Edge(3,6,21.0))
Edge(4,1,Edge(4,1,7.0))
Edge(5,2,Edge(5,2,14.0))
Edge(5,3,Edge(5,3,56.0))
Edge(5,6,Edge(5,6,21.0))
//显然这样不是我们想要的,mapEdges(x=>...)里的x是指的是打call次数,而不是Edge(2,1,7)整体
scala> val t3_graph=graphUserCall.mapEdges(x=>x.attr*7.0)
scala> t3_graph.edges.collect.foreach(println)
Edge(2,1,49.0)
Edge(2,4,14.0)
Edge(3,2,28.0)
Edge(3,6,21.0)
Edge(4,1,7.0)
Edge(5,2,14.0)
Edge(5,3,56.0)
Edge(5,6,21.0)
//需求:打call次数加上出度顶点的年龄,使用mapTriplets
scala> val t4_graphUserCall=graphUserCall.mapTriplets(x=>(x.attr+x.srcAttr._2))
scala> t4_graphUserCall.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),34)
((2,(Bob,27)),(4,(David,42)),29)
((3,(Charlie,65)),(2,(Bob,27)),69)
((3,(Charlie,65)),(6,(Age,50)),68)
((4,(David,42)),(1,(Alice,28)),43)
((5,(Ed,55)),(2,(Bob,27)),57)
((5,(Ed,55)),(3,(Charlie,65)),63)
((5,(Ed,55)),(6,(Age,50)),58)
/*总结一下:
1、mapVertices,修改的是顶点的属性VD
2、mapEdges,修改的是边的关系ED
3、mapTriplets,修改的是边的关系ED
*/
注意:
- 这里每一个操作产生一个新图,其顶点和边被用户定义的map函数修改了。
- 在每一个实例图结构不受影响,仅仅是属性图属性数据变换。
3.4.2 结构算子(结构变换)
属性图结构变换
- reverse:改变边的方向
- subgraph:生成满足顶点和边的条件的子图
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
}
此处注意epred和vpred的区别 ,下面演示示例详细说明。
scala> graphUserCall.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((3,(Charlie,65)),(2,(Bob,27)),4)
((3,(Charlie,65)),(6,(Age,50)),3)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8)
((5,(Ed,55)),(6,(Age,50)),3)
//reverse 改变边的方向
scala> val reverse_graphUserCall=graphUserCall.reverse
scala> reverse_graphUserCall.triplets.collect.foreach(println)
((1,(Alice,28)),(2,(Bob,27)),7)
((1,(Alice,28)),(4,(David,42)),1)
((2,(Bob,27)),(3,(Charlie,65)),4)
((2,(Bob,27)),(5,(Ed,55)),2)
((3,(Charlie,65)),(5,(Ed,55)),8)
((4,(David,42)),(2,(Bob,27)),2)
((6,(Age,50)),(3,(Charlie,65)),3)
((6,(Age,50)),(5,(Ed,55)),3)
//subgraph之vpred
//vpred: (VertexId, VD) => Boolean
//注意匿名函数的参数是顶点信息,也就意味着会每个顶点进行判断
//需求:截取年龄小于65的子图(包括所有顶点)
scala> graphUserCall.subgraph(vpred=(id,attr)=>attr._2<65).triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(6,(Age,50)),3)
//subgraph之epred
//epred: EdgeTriplet[VD,ED] => Boolean
//注意匿名函数的参数是顶点和边的整体信息,如果需要过滤顶点信息,需要使用srcAttr和dstAttr
//需求:截取年龄小于65的子图(仅过滤出度顶点,也就是起始顶点)
scala> graphUserCall.subgraph(epred=(ep)=>ep.srcAttr._2<65).triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8) //比vpred多了此条
((5,(Ed,55)),(6,(Age,50)),3)
3.4.3 join算子
从外部的RDDs加载数据,修改顶点属性。
- joinVertices
- outerJoinVertices
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
//仍使用打call的示例
//新创建一个RDD
scala> val two=sc.makeRDD(Array((1L,"kgc.cn"),(2L,"qq.com"),(3L,"163.com"),(7L,"so.com")))
//joinVertices
//用法:graph.joinVertices(RDD)
scala> graphUserCall.joinVertices(two)((id,v,cmpy)=>(v._1+"@"+cmpy,v._2)).vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice@kgc.cn,28))
(6,(Age,50))
(3,(Charlie@163.com,65))
(5,(Ed,55))
(2,(Bob@qq.com,27))
//outerJoinVertices
//用法:graph.outerJoinVertices(RDD)
scala> graphUserCall.outerJoinVertices(two)((id,v,cmpy)=>(v._1+"@"+cmpy,v._2)).vertices.collect.foreach(println)
(4,(David@None,42)) //RDD中的顶点不匹配时,值为None
(1,(Alice@Some(kgc.cn),28))
(6,(Age@None,50))
(3,(Charlie@Some(163.com),65))
(5,(Ed@None,55))
(2,(Bob@Some(qq.com),27))
属性算子mapVertices可以修改顶点的属性,join算子也可以修改顶点的属性,区别是join算子是从外部的RDDs加载数据。
需求:计算用户粉丝数量
scala> case class User(name:String,age:Int,inDeg:Int,outDeg:Int)
//将顶点入度、出度存入顶点属性中
scala> val t2_graph=graphUserCall.outerJoinVertices(graphUserCall.inDegrees){case(id,u,indeg)=>User(u._1,u._2,indeg.getOrElse(0),0)}.outerJoinVertices(graphUserCall.outDegrees){case(id,u,outdeg)=>User(u.name,u.age,u.inDeg,outdeg.getOrElse(0))}
scala> t2_graph.vertices.collect.foreach(println)
(4,User(David,42,1,1))
(1,User(Alice,28,2,0))
(6,User(Age,50,2,0))
(3,User(Charlie,65,1,2))
(5,User(Ed,55,0,3))
(2,User(Bob,27,2,2))
3.4.4 练习:找出用户粉丝数量
需求:现有数据格式如下,创建图并计算每个用户的粉丝数量。
格式:((User*, ),(User ,*))
- (User*, *)=(用户名,用户ID)
- 第一个用户表示被跟随者(followee)
- 第二个用户表示跟随者(follower)

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.matching.Regex
object Test01 {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[1]")
.getOrCreate()
// 创建SparkContext
val sc: SparkContext = spark.sparkContext
// 编写正则表达式, 用于提取字段
val pattern: Regex =
"""\(\((User\d+,\d+)\),\((User\d+,\d+)\)\)""".r
// 读取数据文件
val twitters: RDD[(Array[String], Array[String])] = sc.textFile("D:\ 上课PPT\ 直播课资料\ 2-Spark阶段\ 02-spark\ twitter_graph_data.txt")
// 正则表达式的模式匹配, 返回一个Option数据类型
.map(line => line match {
case pattern(followee, follower) => (Some(followee), Some(follower))
case _ => (None, None)
// 过滤掉值为None的
}).filter(x => x._1 != None && x._2 != None)
// 正则表达式模式匹配后返回两组值, 分别对这两组值进行切割处理, 返回一个(Array[String], Array[String])
.map(x => (x._1.get.split(","), x._2.get.split(",")))
// 使用flatMap对数据进行扁平化处理, 构建点集合
// 也可以使用union对两个RDD求并集, 这里不做演示
val verts: RDD[(Long, String)] = twitters.flatMap(x => Array((x._1(1).toLong, x._1(0)), (x._2(1).toLong, x._2(0)))).distinct()
// 构建边集合
val edges: RDD[Edge[String]] = twitters.map(x => Edge(x._2(1).toLong, x._1(1).toLong, "follow"))
// 构建图
// 有可能会出现一种情况, 在边集合中出现的点在点集合中不存在
val graph = Graph(verts,edges,"")
// 求入度, 按照入度的值进行倒序排列
// 注意: 要想实现全局有序, 需要重新分成一个区
graph.inDegrees.repartition(1).sortBy(x=>x._2,false).foreach(println)
}
}
/*输出结果:
(123004655,56)
(36851222,56)
(59804598,54)
(63644892,46)
(14444530,42)
......
./
4、常用图算法
4.1 页面排名算法–PageRank
- 用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
- 从本质上讲,PageRank是找出图中顶点(网页链接)的重要性
- GraphX提供了PageRank API用于计算图的PageRank
原理部分详见:PageRank算法原理剖析及Spark实现
class Graph[VD, ED] {
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
}
//tol:收敛时允许的误差,越小越精确, 确定迭代是否结束的参数
//resetProb:随机重置概率
//返回的仍是Graph,格式:(1,1.5453030618621122),本质上是找出顶点的重要性。
//使用不同的收敛误差,结果进行对比
scala> graphUserCall.pageRank(0.1).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.5453030618621122)
(4,1.035409289731306)
(6,1.0247865028119143)
(2,1.0247865028119143)
(3,0.7698396167465112)
(5,0.5998750260362425)
scala> graphUserCall.pageRank(0.01).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.7757164399923602)
(6,1.0009207604397985)
(2,1.0009207604397985)
(4,0.9727164143364966)
(3,0.7024005336419639)
(5,0.5473250911495823)
scala> graphUserCall.pageRank(0.001).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.7924127957615186)
(6,0.9969646507526428)
(2,0.9969646507526428)
(4,0.9688717814927128)
(3,0.6996243163176442)
(5,0.5451618049228396)
scala> graphUserCall.pageRank(0.0001).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.7924127957615186)
(6,0.9969646507526428)
(2,0.9969646507526428)
(4,0.9688717814927128)
(3,0.6996243163176442)
(5,0.5451618049228396)
4.2 连通分量–Connected Component
- 连通分量是一个子图,其中任何两个顶点通过一条边或一系列边相互连接,其顶点是原始图顶点集的子集,其边是原始图边集的子集
class Graph[VD, ED] {
def connectedComponents(): Graph[VertexID, ED]
}
示例参考博客:GraphX之Connected Components
首先准备数据源
links.csv
1,2,friend
1,3,sister
2,4,brother
3,2,boss
4,5,client
1,9,friend
6,7,cousin
7,9,coworker
8,9,father
10,11,colleague
10,12,colleague
11,12,colleague
people.csv
4,Dave,25
6,Faith,21
8,Harvey,47
2,Bob,18
1,Alice,20
3,Charlie,30
7,George,34
9,Ivy,21
5,Eve,30
10,Lily,35
11,Helen,35
12,Ann,35
结构图:

scala> import org.apache.spark.graphx._
scala> case class Person(name:String,age:Int)
scala> val people =sc.textFile("file:///root/test/people.csv")
scala> val peopleRDD=people.map(line=>line.split(',')).map(x=>(x(0).toLong,Person(x(1),x(2).toInt)))
scala> val links=sc.textFile("file:///root/test/links.csv")
scala> val linkRDD=links.map(x=>{val row =x.split(',');Edge(row(0).toLong,row(1).toLong,row(2))})
scala> val graph=Graph(peopleRDD,linkRDD)
//使用connectedComponents
scala> val cc=graph.connectedComponents
scala> cc.vertices.collect.foreach(println)
(4,1)
(11,10)
(1,1)
(6,1)
(8,1)
(3,1)
(9,1)
(7,1)
(12,10)
(10,10)
(5,1)
(2,1)
//从结果中可以看到通过计算之后的图,每个顶点多了一个属性,这个属性表示的就是这个顶点所在的连通图中的最小顶点id。
//例如顶点11所在的连通图中的最小顶点id是10,顶点4所在的连通图中的最小顶点id是1。
//显然cc的格式还不是我们想要的。
//经过connectedComponents得到的结果,可以知道哪些顶点在一个连通图中,这样就可以将一个大图拆分成若干个连通子图。
scala>val newGraph= cc.outerJoinVertices(peopleRDD)((id,mincc,people)=>(mincc,people.get.name,people.get.age))
scala> newGraph.vertices.collect.foreach(println)
(4,(1,Dave,25))
(11,(10,Helen,35))
(1,(1,Alice,20))
(6,(1,Faith,21))
(8,(1,Harvey,47))
(3,(1,Charlie,30))
(9,(1,Ivy,21))
(7,(1,George,34))
(12,(10,Ann,35))
(10,(10,Lily,35))
(5,(1,Eve,30))
(2,(1,Bob,18))
scala> cc.vertices.map(_._2).collect.distinct.foreach(mincc=>{val sub=newGraph.subgraph(vpred=(id,attr)=> attr._1==mincc);println(sub.triplets.collect.foreach(println))})
((1,(1,Alice,20)),(2,(1,Bob,18)),friend)
((1,(1,Alice,20)),(3,(1,Charlie,30)),sister)
((1,(1,Alice,20)),(9,(1,Ivy,21)),friend)
((2,(1,Bob,18)),(4,(1,Dave,25)),brother)
((3,(1,Charlie,30)),(2,(1,Bob,18)),boss)
((4,(1,Dave,25)),(5,(1,Eve,30)),client)
((6,(1,Faith,21)),(7,(1,George,34)),cousin)
((7,(1,George,34)),(9,(1,Ivy,21)),coworker)
((8,(1,Harvey,47)),(9,(1,Ivy,21)),father)
()
((10,(10,Lily,35)),(11,(10,Helen,35)),colleague)
((10,(10,Lily,35)),(12,(10,Ann,35)),colleague)
((11,(10,Helen,35)),(12,(10,Ann,35)),colleague)
()
/*分析:
1、通过connectedComponents得到的新图的顶点属性已经没有了原始的那些信息,所以需要和原始信息作一个join,例如val newGraph = cc.outerJoinVertices(peopleRDD)((id, cc, p)=>(cc,p.get.name,p.get.age))
2、cc.vertices.map(_._2).collect.distinct会得到所有连通图中id最小的顶点编号
3、通过连通图中最小顶点编号,使用subgraph方法得到每个连通子图
*/
总结一下实现思路:
- 1、已有图graph,通过connectedComponents可以得到每个顶点所在的连通图中的最小顶点id,格式:(某顶点id,该顶点连通图最小顶点id),也是一个graph,取名为cc。
- 2、将这个最小id加入到对应顶点的属性中,可以使用outerJoinVertices,得到一个新的graph。
- 3、可以使用subgraph取出子图。
5、Pregel计算框架
可用于导航路线的优选。
class Graph[VD, ED] {
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
}
- initialMsg:在“superstep 0”之前发送至顶点的初始消息
- maxIterations:将要执行的最大迭代次数
- activeDirection:发送消息方向(默认是出边方向:EdgeDirection.Out)
- vprog:用户定义函数,用于顶点接收消息
- sendMsg:用户定义的函数,用于确定下一个迭代发送的消息及发往何处
- mergeMsg:用户定义的函数,在vprog前,合并到达顶点的多个消息
preple原理分析:<>
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object PregelDemo {
def main(args: Array[String]): Unit = {
//1、创建SparkContext
val conf = new SparkConf().setMaster("local[1]").setAppName("PregelDemo")
val sc = SparkContext.getOrCreate(conf)
//2、创建顶点
val vertexArray = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
)
val vertexRDD: RDD[(VertexId, (String,Int))] = sc.makeRDD(vertexArray)
//3、创建边,边的属性代表 相邻两个顶点之间的距离
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(2L, 5L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)
val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)
//4、创建图(使用apply方式创建)
val graph = Graph(vertexRDD, edgeRDD)
//被计算的图中,起始顶点的id
val srcVertexId=5L
val initialGraph:Graph[Double,Int]=graph.mapVertices{case (vid,(name,age))=>if(vid==srcVertexId) 0.0 else Double.PositiveInfinity}
//initialGraph.vertices.collect().foreach(println)
val pregelGraph:Graph[Double,Int]=initialGraph.pregel(
Double.PositiveInfinity,
Int.MaxValue,
EdgeDirection.Out
)(
(vid:VertexId,vd:Double,disMsg:Double)=>{
val minDist:Double=math.min(vd,disMsg)
println("vprog"+vid+" "+vd+" "+disMsg+" "+minDist)
minDist
},
(edgeTriplet:EdgeTriplet[Double,PartitionID])=>{
if (edgeTriplet.srcAttr+edgeTriplet.attr<edgeTriplet.dstAttr){
println("sendMsg:"+edgeTriplet.srcId+" "+edgeTriplet.srcAttr+" "+edgeTriplet.dstId+" "+edgeTriplet.dstAttr)
Iterator[(VertexId,Double)]((
edgeTriplet.dstId,edgeTriplet.srcAttr+edgeTriplet.attr
))
}else{
Iterator.empty
}
},
(msg1:Double,msg2:Double)=>{
math.min(msg1,msg2)
}
)
pregelGraph.vertices.collect().foreach(println)
}
}
/*输出:
vprog4 Infinity Infinity Infinity
vprog1 Infinity Infinity Infinity
vprog6 Infinity Infinity Infinity
vprog3 Infinity Infinity Infinity
vprog5 0.0 Infinity 0.0
vprog2 Infinity Infinity Infinity
sendMsg:5 0.0 3 Infinity
sendMsg:5 0.0 6 Infinity
vprog6 Infinity 3.0 3.0
vprog3 Infinity 8.0 8.0
sendMsg:3 8.0 2 Infinity
vprog2 Infinity 12.0 12.0
sendMsg:2 12.0 1 Infinity
sendMsg:2 12.0 4 Infinity
vprog4 Infinity 14.0 14.0
vprog1 Infinity 19.0 19.0
sendMsg:4 14.0 1 19.0
vprog1 19.0 15.0 15.0
(4,14.0)
(1,15.0)
(6,3.0)
(3,8.0)
(5,0.0)
(2,12.0)
*/
