【Spark】(十七)GraphX之connectedComponents
 发布时间 
 阅读量: 
 阅读量 
连通分量
连通分量是一个子图,其中任何两个顶点通过一条边或一系列边相互连接,其顶点是原始图顶点集的子集,其边是原始图边集的子集

首先准备数据源,存放地址为Linux中/opt/kbfile目录下
links.csv
    1,2,friend
    1,3,sister
    2,4,brother
    3,2,boss
    4,5,client
    1,9,friend
    6,7,cousin
    7,9,coworker
    8,9,father
    10,11,colleague
    10,12,colleague
    11,12,colleague
    
    
      
      
      
      
      
      
      
      
      
      
      
      
    
        people.csv
    4,Dave,25
    6,Faith,21
    8,Harvey,47
    2,Bob,18
    1,Alice,20
    3,Charlie,30
    7,George,34
    9,Ivy,21
    5,Eve,30
    10,Lily,35
    11,Helen,35
    12,Ann,35
    
    
      
      
      
      
      
      
      
      
      
      
      
      
    
        
图结构

案例:使用Spark-Shell
    // 定义样例类
    scala> case class Person(name:String,age:Int)
    
    
      
      
    
        
    // 读取people.csv数据
    scala> val people = sc.textFile("file:///opt/kb09file/people.csv")
    scala> people.collect.foreach(println)
    
    
      
      
      
    
        
    // 生成peopleRDD
    scala> val peopleRDD = people.map(x => x.split(",")).map(x => (x(0).toLong,Person(x(1),x(2).toInt)))
    peopleRDD: org.apache.spark.rdd.RDD[(Long, Person)] = MapPartitionsRDD[1125] at map at <console>:30
    
    scala> peopleRDD.collect.foreach(println)
    
    
      
      
      
      
      
    
        
    // 读取links.csv数据
    scala> val links = sc.textFile("file:///opt/kb09file/links.csv")
    
    scala> links.collect.foreach(println)
    
    
      
      
      
      
    
        
    // 生成linksRDD
    scala> val linksRDD = links.map(x => {
     | val row = x.split(",");
     | Edge(row(0).toInt,row(1).toInt,row(2))
     | })
    linksRDD: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[1128] at map at <console>:28
    
    scala> linksRDD.collect.foreach(println)
    
    
      
      
      
      
      
      
      
      
    
        
    // 创建图
    scala> val graph = Graph(peopleRDD,linksRDD)
    graph: org.apache.spark.graphx.Graph[Person,String] = org.apache.spark.graphx.impl.GraphImpl@5ef1f19e
    
    scala> graph.vertices.collect.foreach(println)
    
    scala> graph.triplets.collect.foreach(println)
    
    
      
      
      
      
      
      
      
    
        
    // 调用connectedComponents
    scala> val cc = graph.connectedComponents
    cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,String] = org.apache.spark.graphx.impl.GraphImpl@282699f
    // 输出结果
    scala> cc.vertices.collect.foreach(println)
    
    
      
      
      
      
      
    
        
从结果中可以看到通过计算之后的图,每个顶点多了一个属性,这个属性表示的就是这个顶点所在的连通图中的最小顶点id。例如顶点11所在的连通图中的最小顶点id是10,顶点4所在的连通图中的最小顶点id是1。
扩展
经过connectedComponents得到的结果,可以知道哪些顶点在一个连通图中,这样就可以将一个大图拆分成若干个连通子图。
    // 分析:
    // cc:(4,1)   peopleRDD:(4,Person(Dave,25))
    // (id,mincc,people):(4,1,Person(Dave,25))
    // (mincc,people.get.name,people.get.age):(1,Dava,25)
    
    scala> val newGraph = cc.outerJoinVertices(peopleRDD)((id,mincc,people)=>(mincc,people.get.name,people.get.age))
    newGraph: org.apache.spark.graphx.Graph[(org.apache.spark.graphx.VertexId, String, Int),String] = org.apache.spark.graphx.impl.GraphImpl@7e42389c
    
    
    scala> newGraph.vertices.collect.foreach(println)
    
    
      
      
      
      
      
      
      
      
      
      
    
        
    // 分析:
    // cc:(4,1) => cc.vertices.map(_._2) = 1
    // newGraph:(4,(1,Dave,25)) => id2._1 = 1
    
    scala> cc.vertices.map(_._2).collect.distinct.foreach(id =>{
    val sub = newGraph.subgraph(vpred = (id1,id2) => id2._1 == id)
    sub.triplets.collect.foreach(println)
    })
    
    
      
      
      
      
      
      
      
      
    
        
分析:
- 通过connectedComponents得到的新图的顶点属性已经没有了原始的那些信息,所以需要和原始信息作一个join,例如val newGraph = cc.outerJoinVertices(peopleRDD)((id, cc, p)=>(cc,p.get.name,p.get.age))
 - cc.vertices.map(_._2).collect.distinct会得到所有连通图中id最小的顶点编号
 - 通过连通图中最小顶点编号,使用subgraph方法得到每个连通子图
 
全部评论 (0)
 还没有任何评论哟~ 
