spark graphx的Connected Components算法使用示例
Graphx作为Spark的图计算组件,提供了丰富的图操作接口和常用的算法。本文重点介绍了Connected Components算法,并通过示例展示了其使用方法。代码部分详细说明了如何读取数据、运行算法并处理结果,最终得出了三个连通子图的最小顶点编号及其对应的节点集合。
Graphx是Spark的图计算工具包,支持一系列图操作接口,并包含常用算法(如graphx库中)。本文将介绍Connected Components算法,并附带示例说明。
1. 相关知识
图论中的基本概念,参考文献:图的基本概念--包括图、连通图、完全图、团、网、子图的概念及示例
如下图有3个极大连通子图(连通分量)

2. 计算图的连通子图
Calculate the connected component IDs for each vertex and construct a graph where each vertex’s value is the smallest vertex ID within its connected component.
确定每个顶点在连通分量中的归属关系。在计算结果中得到的图中,每个顶点新增了一个属性,该属性记录了该顶点所属连通分量中的最小顶点ID。
通过调用connectedComponents方法计算结果,能够确定哪些顶点位于同一个连通图中,从而可以将一个大规模图分解为多个连通子图。
下面的源代码 计算一个图中每个连接子图的顶点集合
2.1. 源代码
package org.apache.spark.graphx.test
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object ConnectedComponentsExample extends Logging {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[4]")
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
val sc = spark.sparkContext
val graph = edgeListFile(sc, "data/graphx/edges.txt", srcIndex = 0, destIndex = 1, numEdgePartitions = 4)
val cc = graph.connectedComponents()
val users: RDD[(Long, String)] = sc.textFile("data/graphx/vertexs.txt").map { line =>
if (!line.isEmpty) {
val fields = line.split("\ |")
(fields(0).toLong, fields(1))
} else {
(0L, "")
}
}
val newGraph = cc.outerJoinVertices(users)((vid, vd, p) => (vd, p.get))
cc.vertices.map(_._2).collect.distinct.foreach(id => {
val sub = newGraph.subgraph(vpred = (vid, vd) => vd._1 == id).mapVertices((vid, vd) => (vd._2))
println("连通图中最小顶点编号是 %s,节点集合是:[%s]".format(id,
sub.vertices.map(v => v._2).collect().mkString(",")))
})
spark.stop()
}
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
srcIndex: Int = 0,
destIndex: Int = 1,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] = {
val startTime = System.currentTimeMillis
val maxIndex = (if (srcIndex > destIndex) srcIndex else destIndex) + 1
val lines =
if (numEdgePartitions > 0) {
sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
} else {
sc.textFile(path)
}
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split(",")
if (lineArray.length < maxIndex) {
throw new IllegalArgumentException("Invalid line: " + line)
}
val srcId = lineArray(srcIndex).toLong
val dstId = lineArray(destIndex).toLong
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, 1)
} else {
builder.add(srcId, dstId, 1)
}
}
}
Iterator((pid, builder.toEdgePartition))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
}
}
2.2 数据
vertexs.txt
1|okram|person
2|spmallette|person
3|TinkerPop|software
4|TinkerGraph|software
5|Gremlin|lanuage
6|dalaro|person
7|mbroecheler|person
8|Titan|software
9|javame|person
10|zhoney|person
11|linary|person
12|HugeGraph|software
13|java|software
14|James Gosling|person
edges.txt
1,3,created
2,3,created
1,2,knows
3,5,define
3,4,contains
4,5,supports
6,8,created
7,8,created
1,8,created
6,7,knows
8,3,implements
8,5,supports
9,12,created
10,12,created
11,12,created
9,10,knows
9,11,knows
13,14,created
2.3. 结果
连通图中最小顶点编号等于1,节点集合由[TinkerGraph, Titan, okram, Gremlin, dalaro, spmallette, TinkerPop, mbroecheler]构成。
连通图中最小顶点编号是 9,节点集合是:[HugeGraph,javame,zhoney,linary]
连通图中最小顶点编号是 13,节点集合是:[java,James Gosling]
