Advertisement

spark graphx 实现二跳邻居统计——使用pregel

阅读量:

本文参考自: 原文地址

本文是对二跳邻居统计的实战,因为用到了pregel,需要对pregel模型有一些大致的了解,例如各个参数的意义,各个函数的作用,以及大致的流程。最核心的应该就是消息发送函数这个部分,注释中有对两轮迭代的过程有解释。

复制代码
 def main(args: Array[String]): Unit = {

    
     case class Person(id: String, tel: String, relatel: String, fritel: String, birthplace: String, homeaddress: String)
    
     case class Relation(ty: Int)
    
  
    
     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    
     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
     @transient
    
     val conf = new SparkConf().setAppName("RelatianNet")
    
     conf.setMaster("local")
    
  
    
     @transient
    
     val sc = new SparkContext(conf)
    
     val vertices = sc.textFile("vertex1.txt")//节点包含用户的id等信息
    
     val id_node_table = new scala.collection.mutable.HashMap[String, Int]()//存放节点id和对应编号的键值对
    
     var nodeid = 0
    
     for (line <- Source.fromFile("vertex.txt", "GBK").getLines) {
    
       val row = line.split("\t")
    
       id_node_table += (row(0) -> nodeid)//给每个节点赋予编号
    
       nodeid += 1
    
     }
    
     val verticesRDD: RDD[(VertexId, Person)] = vertices map { line
    
     =>
    
       val row = line.split("\t")
    
       (id_node_table(row(0)), Person(row(0), row(1), row(2), row(3), row(4), row(5)))
    
     }
    
     println(id_node_table.size)
    
     val edges = sc.textFile(path + "edges1.txt")
    
     val edgesRDD: RDD[Edge[Relation]] = edges map { line
    
     =>
    
       val row = line.split("\t")
    
       Edge(id_node_table(row(0)), id_node_table(row(1)), Relation(row(2).toInt)) //EDGE表由客户->联系人以及客户->第二联系人两张边表组成
    
     }
    
     val relationNet: Graph[Person, Relation] = Graph(verticesRDD, edgesRDD)
    
     val result = relationNet.degrees.filter(x => x._2 >= 2).count()
    
     println(result + "个人的关系人多于2人")
    
  
    
  
    
     //pregel实现
    
     type VMap = Map[VertexId, Int] //定义每个节点存放的数据类型,为若干个(节点编号,一个整数)构成的map,当然发送的消息也得遵守这个类型
    
  
    
     /** * 节点数据的更新 就是集合的union
    
       */
    
     def vprog(vid: VertexId, vdata: VMap, message: VMap) //每轮迭代后都会用此函数来更新节点的数据(利用消息更新本身),vdata为本身数据,message为消息数据
    
     : Map[VertexId, Int] = addMaps(vdata, message)
    
  
    
     /** * 节点数据的更新 就是集合的union
    
       */
    
     def sendMsg(e: EdgeTriplet[VMap, _]) = {
    
     //取两个集合的差集  然后将生命值减1
    
 	val srcMap = (e.dstAttr.keySet -- e.srcAttr.keySet).map { k => k -> (e.dstAttr(k) - 1) }.toMap
    
 	val dstMap = (e.srcAttr.keySet -- e.dstAttr.keySet).map { k => k -> (e.srcAttr(k) - 1) }.toMap       
    
 	if (srcMap.size == 0 && dstMap.size == 0)         
    
 		Iterator.empty       
    
 	else         
    
 		Iterator((e.dstId, dstMap), (e.srcId, srcMap))//发送消息的内容    
    
     }    
    
 		
    
     /** * 消息的合并      
    
     */    
    
     def addMaps(spmap1: VMap, spmap2: VMap): VMap =      
    
 	(spmap1.keySet ++ spmap2.keySet).map { //合并两个map,求并集        
    
 		k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) //对于交集的点的处理,取spmap1和spmap2中最小的值      
    
 	}.toMap   
    
 	
    
     val two = 2    //这里是二跳邻居 所以只需要定义为2即可    
    
     val newG = relationNet.mapVertices((vid, _) => Map[VertexId, Int](vid -> two)) //每个节点存储的数据由一个Map组成,开始的时候只存储了 (该节点编号,2)这一个键值对      
    
 	.pregel(Map[VertexId, Int](), two, EdgeDirection.Out)(vprog, sendMsg, addMaps)    
    
     //pregel参数    
    
     //第一个参数 Map[VertexId, Int]() ,是初始消息,面向所有节点,使用一次vprog来更新节点的值,由于Map[VertexId, Int]()是一个空map类型,所以相当于初始消息什么都没做    
    
     //第二个参数 two,是迭代次数,此时two=2,代表迭代两次(进行两轮的active节点发送消息),第一轮所有节点都是active节点,第二轮收到消息的节点才是active节点。    
    
     //第三个参数 EdgeDirection.Out,是消息发送方向,out代表源节点-》目标节点 这个方向    //pregel 函数参数    //第一个函数 vprog,是用户更新节点数据的程序,此时vprog又调用了addMaps    
    
     //第二个函数 sendMsg,是发送消息的函数,此时用目标节点的map与源节点的map做差,将差集的数据减一;然后同样用源节点的map与目标节点的map做差,同样差集的数据减一        
    
     //第一轮迭代,由于所有节点都只存着自己和2这个键值对,所以对于两个用户之间存在变关系的一对点,都会收到对方的一条消息,内容是(本节点,1)和(对方节点,1)这两个键值对        
    
     //第二轮迭代,收到消息的节点会再一次的沿着边发送消息,此时消息的内容变成了(自己的朋友,0)    //第三个函数 addMaps, 是合并消息,将map合并(相当于求个并集),不过如果有交集(key相同),那么,交集中的key取值(value)为最小的值。    
    
 	
    
     //过滤得到二跳邻居 就是value=0 的顶点    
    
     val twoJumpFirends = newG.vertices      
    
 	.mapValues(_.filter(_._2 == 0).keys) //由于在第二轮迭代,源节点会将自己的邻居(非目标节点)推荐给目标节点——各个邻居就是目标节点的二跳邻居,并将邻居对应的值减为0,    
    
     //twoJumpFirends.collect().foreach(println(_))    
    
     twoJumpFirends.filter(x => x._2 != Set()).foreach(println(_)) //把二跳邻居集合非空的(点,{二跳邻居集合})打印出来
    
 } 
    
  
    
  
    
  
    
  
    
    
    
    

全部评论 (0)

还没有任何评论哟~