spark graphx的Triangle三角形计数算法使用示例
GraphX中的TriangleCount算法用于计算图中三角形的数量,三角形是完全图中任意两点之间有边相连的结构。该算法通过Spark的图计算组件实现,能够高效处理大规模图数据。文本中详细介绍了三角形计数的原理、实现方法以及如何统计每个顶点的三角形数量,并展示了如何通过Gremlin语言加载数据、运行算法并输出结果。结果表明,顶点TinkerPop拥有最多的5个三角形,其他顶点如Titan、HugeGraph、Gremlin等也分别具有3个或2个三角形。
Graphx是Spark的图计算组件,支持丰富的图操作接口和基本算法(如graphx库中所包含)。本文将阐述TriangleCount算法的原理。
1. 相关知识
三角形:在全连接图(即每一对节点之间都存在一条边)中,网络中三角形的数量能够有效反映网络的稠密程度及其质量特征。
三角形计算:如果一条边的两个顶点拥有共同邻居,那么共同邻居与这两个顶点将形成三角形结构;或者,当一个顶点有两个相邻顶点且它们之间存在一条边时,该顶点即为三角形的一部分。
该算法实现了TriangleCount功能,具体细节可在<>中进行分析。需要注意以下事项:
TriangleCount要求边缘是规范方向的,即所有边都要满足(' srcId < dstId ')
使用Graph.partitionBy进行分区的图形
如图所示,JavaMe与Zhone两人共同创建了HugeGraph系统,三个顶点形成了一个三角形结构。其中,图中三角形数量最多的5个顶点及其数量如下:
(TinkerPop,5)
(Titan,3)
(HugeGraph,3)
(Gremlin,3)
(okram,2)

2. 统计顶点的三角形数
如何统计顶点的三角形数量,并取数量最多的N条
该代码块基于Gremlin语言,为每个顶点统计其参与的三角形数量,并筛选出参与三角形数量最多的前5个顶点。
图的数据结构定义及相关内容,建议您参考:HugeGraph图数据库系统架构解析 或 基于Gremlin语言的图数据库构建与应用。这些资源将帮助您深入理解图数据库的构建与应用方法。
2.1. 源代码
package org.apache.spark.graphx.test
import org.apache.spark.SparkContext
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
import org.apache.spark.graphx.{Graph, PartitionStrategy}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession
object TriangleCountingExample extends Logging {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[4]")
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
val sc = spark.sparkContext
val graph = edgeListFile(sc, "data/graphx/edges.txt", true, srcIndex = 0, destIndex = 1,numEdgePartitions = 4)
.partitionBy(PartitionStrategy.EdgePartition2D)
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/vertexs.txt").map { line =>
if (!line.isEmpty) {
val fields = line.split("\ |")
(fields(0).toLong, fields(1))
} else {
(0L, "")
}
}
val triCountByUsername = users.join(triCounts).map { case (_, (username, tc)) =>
(username, tc)
}.sortBy(_._2,false,1).take(5)
println(triCountByUsername.mkString("\n"))
spark.stop()
}
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
srcIndex: Int = 0,
destIndex: Int = 1,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] = {
val startTime = System.currentTimeMillis
val maxIndex = (if (srcIndex > destIndex) srcIndex else destIndex) + 1
// Parse the edge data table directly into edge partitions
val lines =
if (numEdgePartitions > 0) {
sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
} else {
sc.textFile(path)
}
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split(",")
if (lineArray.length < maxIndex) {
throw new IllegalArgumentException("Invalid line: " + line)
}
val srcId = lineArray(srcIndex).toLong
val dstId = lineArray(destIndex).toLong
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, 1)
} else {
builder.add(srcId, dstId, 1)
}
}
}
Iterator((pid, builder.toEdgePartition))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
}
}
2.2 数据
vertexs.txt
1|okram|person
2|spmallette|person
3|TinkerPop|software
4|TinkerGraph|software
5|Gremlin|lanuage
6|dalaro|person
7|mbroecheler|person
8|Titan|software
9|javame|person
10|zhoney|person
11|linary|person
12|HugeGraph|software
edges.txt
1,3,created
2,3,created
1,2,knows
3,5,define
3,4,contains
4,5,supports
6,8,created
7,8,created
1,8,created
6,7,knows
8,3,implements
8,5,supports
9,12,created
10,12,created
11,12,created
9,10,knows
9,11,knows
12,3,implements
12,5,supports
2.3. 结果
(TinkerPop,5)
(Titan,3)
(HugeGraph,3)
(Gremlin,3)
(okram,2)
