Advertisement

spark graphx的Triangle三角形计数算法使用示例

阅读量:

GraphX中的TriangleCount算法用于计算图中三角形的数量,三角形是完全图中任意两点之间有边相连的结构。该算法通过Spark的图计算组件实现,能够高效处理大规模图数据。文本中详细介绍了三角形计数的原理、实现方法以及如何统计每个顶点的三角形数量,并展示了如何通过Gremlin语言加载数据、运行算法并输出结果。结果表明,顶点TinkerPop拥有最多的5个三角形,其他顶点如Titan、HugeGraph、Gremlin等也分别具有3个或2个三角形。

Graphx是Spark的图计算组件,支持丰富的图操作接口和基本算法(如graphx库中所包含)。本文将阐述TriangleCount算法的原理。

1. 相关知识

三角形:在全连接图(即每一对节点之间都存在一条边)中,网络中三角形的数量能够有效反映网络的稠密程度及其质量特征。

三角形计算:如果一条边的两个顶点拥有共同邻居,那么共同邻居与这两个顶点将形成三角形结构;或者,当一个顶点有两个相邻顶点且它们之间存在一条边时,该顶点即为三角形的一部分。

该算法实现了TriangleCount功能,具体细节可在<>中进行分析。需要注意以下事项:

TriangleCount要求边缘是规范方向的,即所有边都要满足(' srcId < dstId ')

使用Graph.partitionBy进行分区的图形

如图所示,JavaMe与Zhone两人共同创建了HugeGraph系统,三个顶点形成了一个三角形结构。其中,图中三角形数量最多的5个顶点及其数量如下:

(TinkerPop,5)

(Titan,3)

(HugeGraph,3)

(Gremlin,3)

(okram,2)

2. 统计顶点的三角形数

如何统计顶点的三角形数量,并取数量最多的N条

该代码块基于Gremlin语言,为每个顶点统计其参与的三角形数量,并筛选出参与三角形数量最多的前5个顶点。

图的数据结构定义及相关内容,建议您参考:HugeGraph图数据库系统架构解析基于Gremlin语言的图数据库构建与应用。这些资源将帮助您深入理解图数据库的构建与应用方法。

2.1. 源代码

复制代码
 package org.apache.spark.graphx.test

    
  
    
 import org.apache.spark.SparkContext
    
 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
    
 import org.apache.spark.graphx.{Graph, PartitionStrategy}
    
 import org.apache.spark.internal.Logging
    
 import org.apache.spark.storage.StorageLevel
    
 import org.apache.spark.sql.SparkSession
    
  
    
 object TriangleCountingExample extends Logging {
    
   def main(args: Array[String]): Unit = {
    
     val spark = SparkSession
    
       .builder
    
       .master("local[4]")
    
       .appName(s"${this.getClass.getSimpleName}")
    
       .getOrCreate()
    
     val sc = spark.sparkContext
    
     
    
     val graph = edgeListFile(sc, "data/graphx/edges.txt", true, srcIndex = 0, destIndex = 1,numEdgePartitions = 4)
    
       .partitionBy(PartitionStrategy.EdgePartition2D)
    
     
    
     val triCounts = graph.triangleCount().vertices
    
     // Join the triangle counts with the usernames
    
     val users = sc.textFile("data/graphx/vertexs.txt").map { line =>
    
       if (!line.isEmpty) {
    
     val fields = line.split("\ |")
    
     (fields(0).toLong, fields(1))
    
       } else {
    
     (0L, "")
    
       }
    
     }
    
     val triCountByUsername = users.join(triCounts).map { case (_, (username, tc)) =>
    
       (username, tc)
    
     }.sortBy(_._2,false,1).take(5)
    
     println(triCountByUsername.mkString("\n"))
    
     spark.stop()
    
   }
    
  
    
   def edgeListFile(
    
                 sc: SparkContext,
    
                 path: String,
    
                 canonicalOrientation: Boolean = false,
    
                 srcIndex: Int = 0,
    
                 destIndex: Int = 1,
    
                 numEdgePartitions: Int = -1,
    
                 edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    
                 vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
    
   : Graph[Int, Int] = {
    
     val startTime = System.currentTimeMillis
    
     val maxIndex = (if (srcIndex > destIndex) srcIndex else destIndex) + 1
    
  
    
     // Parse the edge data table directly into edge partitions
    
     val lines =
    
       if (numEdgePartitions > 0) {
    
     sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
    
       } else {
    
     sc.textFile(path)
    
       }
    
     val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
    
       val builder = new EdgePartitionBuilder[Int, Int]
    
       iter.foreach { line =>
    
     if (!line.isEmpty && line(0) != '#') {
    
       val lineArray = line.split(",")
    
       if (lineArray.length < maxIndex) {
    
         throw new IllegalArgumentException("Invalid line: " + line)
    
       }
    
       val srcId = lineArray(srcIndex).toLong
    
       val dstId = lineArray(destIndex).toLong
    
       if (canonicalOrientation && srcId > dstId) {
    
         builder.add(dstId, srcId, 1)
    
       } else {
    
         builder.add(srcId, dstId, 1)
    
       }
    
     }
    
       }
    
       Iterator((pid, builder.toEdgePartition))
    
     }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
    
     edges.count()
    
  
    
     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
    
  
    
     GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
    
       vertexStorageLevel = vertexStorageLevel)
    
   }
    
 }

2.2 数据

复制代码
 vertexs.txt

    
 1|okram|person
    
 2|spmallette|person
    
 3|TinkerPop|software
    
 4|TinkerGraph|software
    
 5|Gremlin|lanuage
    
 6|dalaro|person
    
 7|mbroecheler|person
    
 8|Titan|software
    
 9|javame|person
    
 10|zhoney|person
    
 11|linary|person
    
 12|HugeGraph|software
    
  
    
 edges.txt
    
 1,3,created
    
 2,3,created
    
 1,2,knows
    
 3,5,define
    
 3,4,contains
    
 4,5,supports
    
 6,8,created
    
 7,8,created
    
 1,8,created
    
 6,7,knows
    
 8,3,implements
    
 8,5,supports
    
 9,12,created
    
 10,12,created
    
 11,12,created
    
 9,10,knows
    
 9,11,knows
    
 12,3,implements
    
 12,5,supports

2.3. 结果

(TinkerPop,5)
(Titan,3)
(HugeGraph,3)
(Gremlin,3)
(okram,2)

全部评论 (0)

还没有任何评论哟~