spark进阶(九):GraphX使用
发布时间
阅读量:
阅读量
GraphX是Spark中的一个分布式图计算框架,是对Spark RDD的扩展。这里所说的图并不是图片,而是一个抽象的关系网。例如,社交应用微信、QQ、微博等用户之间的好友、关注等存在错综复杂的联系,这种联系构成了一张巨大的关系网,我们把这个关系网称为图。GraphX目前适用于微信、微博、社交网络、电子商务等类型的产品,也越来越多地应用于推荐领域的人群划分、年龄预测、标签推理等。
Vertices、edges、triplets是Spark GraphX中3个最重要的概念。
Vertices对应的RDD名称为VertexRDD,VertexRDD继承自RDD[(VertexId, VD) ],RDD的类型是VertexId和VD,其中VD是属性的类型,也就是说,VertexRDD有ID和顶点属性。
Edges对应的是EdgeRDD,EdgeRDD继承的RDD的类型是Edge[ED],属性有3个:源顶点的ID、目标顶点的ID、边属性。
Triplets的属性有源顶点ID、源顶点属性、边属性、目标顶点ID、目标顶点属性,Triplets其实是对Vertices和Edges做了Join操作
一、简单使用
其实顶点和边都是RDD,通过顶点和边之间的关系构建的图其三元组关系也是一个RDD,都适用RDD的一些操作
/** * @author: ffzs
* @Date: 2021/10/11 下午3:48
*/
object GraphX {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("SparkGraphXExample")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val users:RDD[(VertexId, (String, String))] = sc.parallelize(Array(
(3L, ("zhangsan", "student")),
(2L, ("lisi", "prof")),
(5L, ("wangwu", "prof")),
(7L, ("zhaosi", "postdoc")),
))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(
Edge(3L, 7L, "collab"),
Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"),
Edge(5L, 7L, "pi"),
Edge(6L, 7L, "pi")
))
val defaultUser = ("ffzs", "Missing")
val graph = Graph(users, relationships, defaultUser)
// 图中获取顶点,进行过滤操作
val filteredVert = graph.vertices
.filter(_._2._2 == "postdoc")
.count
println(f"博士后节点的数量为: $filteredVert")
// 图中获取边,进行过滤操作
val filteredEdge = graph.edges
.filter(edge => edge.srcId > edge.dstId)
.count
println(f"途中边起始ID大于终止ID的数量为: $filteredEdge")
// 对图中的三元组进行操作 (start vertices, end vertices, relation)
graph.triplets.foreach(println)
// 计算每个顶点的入度
println("每个顶点的入度情况:")
val inDegrees = graph.inDegrees
inDegrees.foreach(println)
// 计算每个顶点的出度
println("每个顶点的出度情况:")
val outDegrees = graph.outDegrees
outDegrees.foreach(println)
// 计算每个顶点的度: 入度 + 出度
val degrees = graph.degrees
// 对图中所有边关系进行翻转
val reverseGraph = graph.reverse
// 通过subgraph对图进行过滤获得新的图
println("通过subgraph对图进行过滤获得新的图")
val subGraph = graph.subgraph(
vpred = (id, attr) => attr._2 == "prof"
)
subGraph.triplets.foreach(println)
// groupEdges 对相同起始终止的边进行合并
println("groupEdges 对相同起始终止的边进行合并")
val grouped = graph.groupEdges(merge = (a, b) => a + b)
grouped.edges.collect().foreach(println)
// mapVertices 更改顶点属性
println("通过 mapVertices 更改顶点属性")
graph.mapVertices((id, attr) => (attr._1, if (attr._2=="student") "student" else "teacher"))
.vertices
.foreach(println)
// mapEdges 更改顶点属性
println("通过 mapEdges 更改顶点属性")
graph.mapEdges(edge => if (edge.attr == "pi") "friend" else edge.attr)
.edges.foreach(println)
}
}
- 计算图中每一个顶点的入度和出度情况
- 对图进行一些过滤生成新的图
- 更改图中的属性
- 对图中重复边进行聚合
博士后节点的数量为: 1
途中边起始ID大于终止ID的数量为: 1
((5,(wangwu,prof)),(7,(zhaosi,postdoc)),pi)
((3,(zhangsan,student)),(7,(zhaosi,postdoc)),collab)
((2,(lisi,prof)),(5,(wangwu,prof)),colleague)
((5,(wangwu,prof)),(3,(zhangsan,student)),advisor)
((6,(ffzs,Missing)),(7,(zhaosi,postdoc)),pi)
每个顶点的入度情况:
(5,1)
(7,3)
(3,1)
每个顶点的出度情况:
(5,2)
(6,1)
(3,1)
(2,1)
通过subgraph对图进行过滤获得新的图
((2,(lisi,prof)),(5,(wangwu,prof)),colleague)
groupEdges 对相同起始终止的边进行合并
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)
Edge(6,7,pi)
通过 mapVertices 更改顶点属性
(7,(zhaosi,teacher))
(2,(lisi,teacher))
(6,(ffzs,teacher))
(3,(zhangsan,student))
(5,(wangwu,teacher))
通过 mapEdges 更改顶点属性
Edge(5,7,friend)
Edge(5,3,advisor)
Edge(3,7,collab)
Edge(2,5,colleague)
Edge(6,7,friend)
全部评论 (0)
还没有任何评论哟~
