2020.11.23课堂笔记(基于Spark GraphX的图形数据分析)
理解图(Graph)的基本概念与术语
一种网状数据结构由节点集合(node)及节点间的关系集合(关联)构成于
图的术语
顶点(Vertex)
边(Edge)
通常表示为二元组:Graph=(V,E)
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}

有向图
G=(V,E)
V={A,B,C,D,E}
E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>}

无向图
G=(V,E)
V={A,B,C,D,E}
E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}

有环图
包含一系列顶点连接的回路(环路)

无环图
DAG即为有向无环图

该图论中的度被定义为一个特定项节点连接的所有边的数量之总和。具体而言,
出度被定义为从该节点出发指向其他任意项节点的有向关系数量,
而入度过量则表示自他不同项节点引入到该特定项节点的所有有向关系数量之总和。

图的经典表示法
邻接矩阵

1、在每一行里,在对应的单元格的位置上设置数值为1
2、在每一列里,在对应的单元格的位置上设置数值为2;这样做可以让通过该行或者该列的数据来快速计算出顶点度数
应用场景
在地图应用中寻找最短路径
社交网络关系
网页间超链接关系
理解Spark GraphX 数据模型
Spark GraphX 简介
GraphX是Spark提供的分布式图计算API工具
其主要特点包括:
基于内存实现了数据的共享复用以及高效的批量读取功能;
通过弹性扩展的分布式属性图(Property Graph)实现了异构数据视图的一体化管理;
与其相关的主流组件如Streaming、SQL以及ML库等实现了无缝集成。
GraphX核心抽象
具有弹性的分布式属性数据结构(Resilient Distributed Property Graph)在此模型中设计时,在其基础架构上实现了对动态变化的有效适应能力。该结构中每个节点和关系都携带对应的属性信息。


一份物理存储,两种视图

所有Graph视图的操作都会被转译为其关联的Table视图的RDD操作
掌握Spark GraphX API
Graph实例(V,D)
// 1.导入spark graphx包
scala> import org.apache.spark.graphx._
// 2.创建 vertices 顶点rdd
scala> val vertices=sc.makeRDD(Seq( (1L,1),(2L,2),(3L,3) ))
vertices: org.apache.spark.rdd.RDD[(Long, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:27
// 3.创建edges 边rdd
scala> val edges=sc.makeRDD(Seq( Edge(1L,2L,1),Edge(2L,3L,2) ))
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = ParallelCollectionRDD[1] at makeRDD at <console>:27
// 4.创建 graph 对象
scala> val graph = Graph(vertices,edges)
graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@6a2d066d
获取graph对象后可以查看的信息:
// 获取graph图对象的 vertices 点信息
scala> graph.vertices.collect
res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,1), (3,3), (2,2))
// 获取graph图对象的 edges 边信息
scala> graph.edges.collect
res1: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1,2,1), Edge(2,3,2))
// 获取graph图对象的 triplets 信息 会把顶点的全信息拉下来
scala> graph.triplets.collect
res2: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((1,1),(2,2),1), ((2,2),(3,3),2))
使用idea编写程序:需要在maven项目中添加依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.1.1</version>
</dependency>
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphX01 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("graphx")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
import org.apache.spark.graphx._
val vertices: RDD[(VertexId, PartitionID)] = sc.makeRDD(Seq( (1L,1),(2L,2),(3L,3) ))
val edges: RDD[Edge[PartitionID]] = sc.makeRDD(Seq( Edge(1L,2L,1),Edge(2L,3L,2) ))
val graph: Graph[PartitionID, PartitionID] = Graph(vertices,edges)
graph.edges.collect().foreach(println)
/*Edge(1,2,1)
Edge(2,3,2)*/
graph.vertices.collect().foreach(println)
/*(1,1)
(2,2)
(3,3)*/
graph.triplets.collect().foreach(println)
/*((1,1),(2,2),1)
((2,2),(3,3),2)*/
}
}
使用加载文件的方式创建graph对象
[root@hadoop100 kb09file]# vi followers.txt
[root@hadoop100 kb09file]# cat followers.txt
2 3
3 4
1 4
2 4
//加载边列表文件创建图,文件每行描述一条边,格式:srcId dstId。顶点与边的属性均为1
scala> val graphLoad = GraphLoader.edgeListFile(sc,"file:///opt/kb09file/followers.txt")
grathLoad: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@27870c76
//顶点信息
scala> graphLoad.vertices.collect
res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,1), (3,1), (2,1))
//边信息
scala> graphLoad.edges.collect
res6: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1,4,1), Edge(2,3,1), Edge(2,4,1), Edge(3,4,1))
//triplets信息
scala> graphLoad.triplets.collect
res7: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((1,1),(4,1),1), ((2,1),(3,1),1), ((2,1),(4,1),1), ((3,1),(4,1),1))
在IDEA中的实现:
import org.apache.spark.{SparkConf, SparkContext}
object GraphX02 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("graphx")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
import org.apache.spark.graphx._
val graph: Graph[PartitionID, PartitionID] = GraphLoader.edgeListFile(sc,"in/followers.txt")
graph.edges.collect().foreach(println)
/*Edge(2,3,1)
Edge(3,4,1)
Edge(1,4,1)
Edge(2,4,1)*/
graph.vertices.collect().foreach(println)
/*(4,1)
(2,1)
(1,1)
(3,1)*/
graph.triplets.collect().foreach(println)
/*((2,1),(3,1),1)
((3,1),(4,1),1)
((1,1),(4,1),1)
((2,1),(4,1),1)*/
}
}
案例:

构建用户合作关系属性图
顶点属性:用户名,职业
边属性:合作关系
// 1.创建 vertices 顶点rdd
scala> val users = sc.parallelize(Array( (3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","professor")),(2L,("istoica","professor")) ))
users: org.apache.spark.rdd.RDD[(Long, (String, String))] = ParallelCollectionRDD[62] at parallelize at <console>:27
// 2.创建 edges 边rdd
scala> val relationship = sc.parallelize(Array( Edge(3L,7L,"Colla"),Edge(5L,3L,"Advisor"),Edge(2L,5L,"Colleague"),Edge(5L,7L,"PI") ))
relationship: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[63] at parallelize at <console>:27
// 3.创建 graph 对象
scala> val graphUser= Graph(users,relationship)
graphUser: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@57f1635f
查看graph对象的相关信息:
//顶点信息
scala> graphUser.vertices.collect
res8: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((3,(rxin,student)), (7,(jgonzal,postdoc)), (5,(franklin,professor)), (2,(istoica,professor)))
//边信息
scala> graphUser.edges.collect
res9: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(2,5,Colleague), Edge(3,7,Colla), Edge(5,3,Advisor), Edge(5,7,PI))
//triplets信息
scala> graphUser.triplets.collect
res10: Array[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = Array(((2,(istoica,professor)),(5,(franklin,professor)),Colleague), ((3,(rxin,student)),(7,(jgonzal,postdoc)),Colla), ((5,(franklin,professor)),(3,(rxin,student)),Advisor), ((5,(franklin,professor)),(7,(jgonzal,postdoc)),PI))
案例2:
搭建用户社交网络架构
顶点:账号、年龄
边:互动频率
筛选活跃度较高的账号

假设打call超过5次,表示真爱。请找出他(她)们

// 1.创建 vertices 顶点rdd
scala> val userRdd = sc.parallelize(Array( (1L,("alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50)) ))
// 2.创建 edges 边rdd
scala> val usercallRdd = sc.parallelize(Array( Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(3L,6L,3),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3) ))
// 3.创建 graph 对象
scala> val userCallGraph= Graph(userRdd,usercallRdd)
userCallGraph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@1a9732d1
过滤出年龄大于30的用户:
scala> userCallGraph.vertices.filter(v=>v._2._2>30).collect.foreach(println)
(4,(David,42))
(6,(Fran,50))
(3,(Charlie,65))
(5,(Ed,55))
scala> userCallGraph.vertices.filter(v=>v._2._2>30).collect.foreach(x=>println("name:"+x._2._1+" age:"+x._2._2))
name:David age:42
name:Fran age:50
name:Charlie age:65
name:Ed age:55
scala> userCallGraph.vertices.filter{ case (id,(name,age)) => age>30 }.collect.foreach(println)
(4,(David,42))
(6,(Fran,50))
(3,(Charlie,65))
(5,(Ed,55))
假设打call超过5次,表示真爱。请找出他(她)们:
scala> userCallGraph.triplets
res19: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = MapPartitionsRDD[113] at mapPartitions at GraphImpl.scala:48
scala> userCallGraph.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((3,(Charlie,65)),(2,(Bob,27)),4)
((3,(Charlie,65)),(6,(Fran,50)),3)
((4,(David,42)),(1,(alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8)
((5,(Ed,55)),(6,(Fran,50)),3)
//统计打call的次数
scala> userCallGraph.triplets.collect.foreach(x=>println(x.srcAttr._1 +"喜欢"+ x.dstAttr._1 + "喜欢指数:" + x.attr) )
Bob喜欢alice喜欢指数:7
Bob喜欢David喜欢指数:2
Charlie喜欢Bob喜欢指数:4
Charlie喜欢Fran喜欢指数:3
David喜欢alice喜欢指数:1
Ed喜欢Bob喜欢指数:2
Ed喜欢Charlie喜欢指数:8
Ed喜欢Fran喜欢指数:3
//符合条件打call次数大于5次的
scala> userCallGraph.triplets.filter(x=>x.attr>5).collect.foreach(x=>println(x.srcAttr._1 +"喜欢"+ x.dstAttr._1 + "喜欢指数:" + x.attr) )
Bob喜欢alice喜欢指数:7
Ed喜欢Charlie喜欢指数:8
在IDEA中实现:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphX03 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("graphx")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
import org.apache.spark.graphx._
val userRdd: RDD[(VertexId, (String, PartitionID))] = sc.parallelize(
Array( (1L,("alice",28))
,(2L,("Bob",27))
,(3L,("Charlie",65))
,(4L,("David",42))
,(5L,("Ed",55))
,(6L,("Fran",50))
))
val usercallRdd: RDD[Edge[PartitionID]] = sc.parallelize(
Array(
Edge(2L,1L,7)
,Edge(2L,4L,2)
,Edge(3L,2L,4)
,Edge(3L,6L,3)
,Edge(4L,1L,1)
,Edge(5L,2L,2)
,Edge(5L,3L,8)
,Edge(5L,6L,3)
))
val userCallGraph: Graph[(String, PartitionID), PartitionID] = Graph(userRdd,usercallRdd)
userCallGraph.vertices.filter(x=>x._2._2>30).collect().foreach(x=>println("name: " + x._2._1 +",age: "+x._2._2))
/*name: Charlie,age: 65
name: David,age: 42
name: Ed,age: 55
name: Fran,age: 50*/
userCallGraph.vertices.filter{ case (id,(name,age)) => age>30 }.collect().foreach(println)
/*(3,(Charlie,65))
(4,(David,42))
(5,(Ed,55))
(6,(Fran,50))*/
userCallGraph.triplets.collect().foreach(println) //RDD[EdgeTriplet[(String, PartitionID), PartitionID]]
/*((2,(Bob,27)),(1,(alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((3,(Charlie,65)),(2,(Bob,27)),4)
((3,(Charlie,65)),(6,(Fran,50)),3)
((4,(David,42)),(1,(alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8)
((5,(Ed,55)),(6,(Fran,50)),3)*/
userCallGraph.triplets.filter(x=>x.attr>5).collect.foreach(x=>println(x.srcAttr._1 +"喜欢"+ x.dstAttr._1 + "喜欢指数:" + x.attr) )
/*Bob喜欢alice喜欢指数:7
Ed喜欢Charlie喜欢指数:8*/
userCallGraph.edges.filter{case Edge(src,dst,attr) => attr>5}.collect().foreach(println)
/*Edge(2,1,7)
Edge(5,3,8)*/
}
}
掌握Spark GraphX 图算法
PageRank
Connected Component
掌握Spark GraphX Pregel
在课堂练习中,请确定哪些网络红人?需求说明如下:
数据文件为 twitter-graph-data.txt。
格式定义如下:每条记录由两个元组构成( username, userId ),其中前者代表被关注者( followee ),后者代表关注者( follower )。
请构建该社交网络图谱,并计算每位用户的关注人数。
最后,请识别出最受欢迎的网络红人。
((User47,86566510),(User83,15647839))
((User47,86566510),(User42,197134784))
((User89,74286565),(User49,19315174))
((User16,22679419),(User69,45705189))
((User37,14559570),(User64,24742040))
((User31,63644892),(User10,123004655))
((User10,123004655),(User50,17613979))
((User37,14559570),(User11,14269220))
((User78,3365291),(User30,93905958))
((User14,199097645),(User60,16547411))
((User3,14874480),(User42,197134784))
((User40,813286),(User9,15434432))
((User10,123004655),(User34,10211502))
((User90,34870269),(User53,25566593))
((User12,24741956),(User60,16547411))
((User12,24741956),(User5,18927441))
((User37,14559570),(User39,22831657))
((User89,74286565),(User32,15670515))
((User28,7465732),(User65,90569268))
((User89,74286565),(User9,15434432))
------仅展示20条数据文件----------------
代码实现,这里使用到了正则匹配
object Demo08 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[4]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("ERROR")
import org.apache.spark.graphx._
// \(\(User47,86566510\),\(User83,15647839\)\)
// 编写正则表达式,用于提取字段:
// \(\((User\d+,\d+)\),\((User\d+,\d+)\)\)
// 正则表达式模式匹配后返回两组值,分别对这两组值进行切割处理
val r: Regex = """\(\((User\d+,\d+)\),\((User\d+,\d+)\)\)""".r
// 读取数据文件
val twitters: RDD[(Option[String], Option[String])] = sc.textFile("data/twitter_graph_data.txt")
.map(x => x match {
case r(followee, follower) => (Some(followee), Some(follower))
case _ => (None, None)
}).filter(x => x._1 != None && x._2 != None)
// flatMap对数据进行扁平化处理
// 也可以使用union对两个RDD求并集,这里不做演示
val verts: RDD[(VertexId, String)] = twitters.flatMap(x => {
val str1: Array[String] = x._1.get.split(",")
val str2: Array[String] = x._2.get.split(",")
Array((str1(1).toLong, str1(0)), (str2(1).toLong, str2(0)))
}).distinct()
val edges: RDD[Edge[String]] = twitters.map(x => {
val str1: Array[String] = x._1.get.split(",")
val str2: Array[String] = x._2.get.split(",")
Edge(str2(1).toLong, str1(1).toLong,"follow")
})
val graph: Graph[String, String] = Graph(verts,edges,"")
// 求入度,按照入度的值进行倒序排列
// 注意:要想实现全局有序,需要重新分成一个分区
graph.inDegrees.repartition(1).sortByKey().foreach(println)
}
}
