Advertisement

spark graphx的Connected Components算法使用示例

阅读量:

Graphx作为Spark的图计算组件,提供了丰富的图操作接口和常用的算法。本文重点介绍了Connected Components算法,并通过示例展示了其使用方法。代码部分详细说明了如何读取数据、运行算法并处理结果,最终得出了三个连通子图的最小顶点编号及其对应的节点集合。

Graphx是Spark的图计算工具包,支持一系列图操作接口,并包含常用算法(如graphx库中)。本文将介绍Connected Components算法,并附带示例说明。

1. 相关知识

图论中的基本概念,参考文献:图的基本概念--包括图、连通图、完全图、团、网、子图的概念及示例

如下图有3个极大连通子图(连通分量)

2. 计算图的连通子图

Calculate the connected component IDs for each vertex and construct a graph where each vertex’s value is the smallest vertex ID within its connected component.

确定每个顶点在连通分量中的归属关系。在计算结果中得到的图中,每个顶点新增了一个属性,该属性记录了该顶点所属连通分量中的最小顶点ID。

通过调用connectedComponents方法计算结果,能够确定哪些顶点位于同一个连通图中,从而可以将一个大规模图分解为多个连通子图。

下面的源代码 计算一个图中每个连接子图的顶点集合

2.1. 源代码

复制代码
 package org.apache.spark.graphx.test

    
  
    
 import org.apache.spark.SparkContext
    
 import org.apache.spark.graphx.Graph
    
 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
    
 import org.apache.spark.internal.Logging
    
 import org.apache.spark.rdd.RDD
    
 import org.apache.spark.sql.SparkSession
    
 import org.apache.spark.storage.StorageLevel
    
  
    
 object ConnectedComponentsExample extends Logging {
    
   def main(args: Array[String]): Unit = {
    
     val spark = SparkSession
    
       .builder
    
       .master("local[4]")
    
       .appName(s"${this.getClass.getSimpleName}")
    
       .getOrCreate()
    
     val sc = spark.sparkContext
    
  
    
     val graph = edgeListFile(sc, "data/graphx/edges.txt", srcIndex = 0, destIndex = 1, numEdgePartitions = 4)
    
     val cc = graph.connectedComponents()
    
     val users: RDD[(Long, String)] = sc.textFile("data/graphx/vertexs.txt").map { line =>
    
       if (!line.isEmpty) {
    
     val fields = line.split("\ |")
    
     (fields(0).toLong, fields(1))
    
       } else {
    
     (0L, "")
    
       }
    
     }
    
     val newGraph = cc.outerJoinVertices(users)((vid, vd, p) => (vd, p.get))
    
     cc.vertices.map(_._2).collect.distinct.foreach(id => {
    
       val sub = newGraph.subgraph(vpred = (vid, vd) => vd._1 == id).mapVertices((vid, vd) => (vd._2))
    
       println("连通图中最小顶点编号是 %s,节点集合是:[%s]".format(id,
    
     sub.vertices.map(v => v._2).collect().mkString(",")))
    
     })
    
  
    
     spark.stop()
    
   }
    
  
    
   def edgeListFile(
    
                 sc: SparkContext,
    
                 path: String,
    
                 canonicalOrientation: Boolean = false,
    
                 srcIndex: Int = 0,
    
                 destIndex: Int = 1,
    
                 numEdgePartitions: Int = -1,
    
                 edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    
                 vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
    
   : Graph[Int, Int] = {
    
     val startTime = System.currentTimeMillis
    
     val maxIndex = (if (srcIndex > destIndex) srcIndex else destIndex) + 1
    
  
    
     val lines =
    
       if (numEdgePartitions > 0) {
    
     sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
    
       } else {
    
     sc.textFile(path)
    
       }
    
     val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
    
       val builder = new EdgePartitionBuilder[Int, Int]
    
       iter.foreach { line =>
    
     if (!line.isEmpty && line(0) != '#') {
    
       val lineArray = line.split(",")
    
       if (lineArray.length < maxIndex) {
    
         throw new IllegalArgumentException("Invalid line: " + line)
    
       }
    
       val srcId = lineArray(srcIndex).toLong
    
       val dstId = lineArray(destIndex).toLong
    
       if (canonicalOrientation && srcId > dstId) {
    
         builder.add(dstId, srcId, 1)
    
       } else {
    
         builder.add(srcId, dstId, 1)
    
       }
    
     }
    
       }
    
       Iterator((pid, builder.toEdgePartition))
    
     }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
    
     edges.count()
    
  
    
     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
    
  
    
     GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
    
       vertexStorageLevel = vertexStorageLevel)
    
   }
    
 }

2.2 数据

复制代码
 vertexs.txt

    
 1|okram|person
    
 2|spmallette|person
    
 3|TinkerPop|software
    
 4|TinkerGraph|software
    
 5|Gremlin|lanuage
    
 6|dalaro|person
    
 7|mbroecheler|person
    
 8|Titan|software
    
 9|javame|person
    
 10|zhoney|person
    
 11|linary|person
    
 12|HugeGraph|software
    
 13|java|software
    
 14|James Gosling|person
    
  
    
 edges.txt
    
 1,3,created
    
 2,3,created
    
 1,2,knows
    
 3,5,define
    
 3,4,contains
    
 4,5,supports
    
 6,8,created
    
 7,8,created
    
 1,8,created
    
 6,7,knows
    
 8,3,implements
    
 8,5,supports
    
 9,12,created
    
 10,12,created
    
 11,12,created
    
 9,10,knows
    
 9,11,knows
    
 13,14,created

2.3. 结果

连通图中最小顶点编号等于1,节点集合由[TinkerGraph, Titan, okram, Gremlin, dalaro, spmallette, TinkerPop, mbroecheler]构成。

连通图中最小顶点编号是 9,节点集合是:[HugeGraph,javame,zhoney,linary]

连通图中最小顶点编号是 13,节点集合是:[java,James Gosling]

全部评论 (0)

还没有任何评论哟~