Advertisement

spark进阶(九):GraphX使用

阅读量:

GraphX是Spark中的一个分布式图计算框架,是对Spark RDD的扩展。这里所说的图并不是图片,而是一个抽象的关系网。例如,社交应用微信、QQ、微博等用户之间的好友、关注等存在错综复杂的联系,这种联系构成了一张巨大的关系网,我们把这个关系网称为图。GraphX目前适用于微信、微博、社交网络、电子商务等类型的产品,也越来越多地应用于推荐领域的人群划分、年龄预测、标签推理等。

Vertices、edges、triplets是Spark GraphX中3个最重要的概念。

Vertices对应的RDD名称为VertexRDD,VertexRDD继承自RDD[(VertexId, VD) ],RDD的类型是VertexId和VD,其中VD是属性的类型,也就是说,VertexRDD有ID和顶点属性。

Edges对应的是EdgeRDD,EdgeRDD继承的RDD的类型是Edge[ED],属性有3个:源顶点的ID、目标顶点的ID、边属性。

Triplets的属性有源顶点ID、源顶点属性、边属性、目标顶点ID、目标顶点属性,Triplets其实是对Vertices和Edges做了Join操作

一、简单使用

其实顶点和边都是RDD,通过顶点和边之间的关系构建的图其三元组关系也是一个RDD,都适用RDD的一些操作

复制代码
    /** * @author: ffzs
     * @Date: 2021/10/11 下午3:48
     */
    object GraphX {
      def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder
      .appName("SparkGraphXExample")
      .master("local[*]")
      .getOrCreate()
    
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")
    
    val users:RDD[(VertexId, (String, String))] = sc.parallelize(Array(
      (3L, ("zhangsan", "student")),
      (2L, ("lisi", "prof")),
      (5L, ("wangwu", "prof")),
      (7L, ("zhaosi", "postdoc")),
    ))
    
    val relationships: RDD[Edge[String]] = sc.parallelize(Array(
      Edge(3L, 7L, "collab"),
      Edge(5L, 3L, "advisor"),
      Edge(2L, 5L, "colleague"),
      Edge(5L, 7L, "pi"),
      Edge(6L, 7L, "pi")
    ))
    
    val defaultUser = ("ffzs", "Missing")
    val graph = Graph(users, relationships, defaultUser)
    
    // 图中获取顶点,进行过滤操作
    val filteredVert = graph.vertices
      .filter(_._2._2 == "postdoc")
      .count
    println(f"博士后节点的数量为: $filteredVert")
    
    // 图中获取边,进行过滤操作
    val filteredEdge = graph.edges
      .filter(edge => edge.srcId > edge.dstId)
      .count
    println(f"途中边起始ID大于终止ID的数量为: $filteredEdge")
    
    // 对图中的三元组进行操作 (start vertices, end vertices, relation)
    graph.triplets.foreach(println)
    
    // 计算每个顶点的入度
    println("每个顶点的入度情况:")
    val inDegrees = graph.inDegrees
    inDegrees.foreach(println)
    
    // 计算每个顶点的出度
    println("每个顶点的出度情况:")
    val outDegrees = graph.outDegrees
    outDegrees.foreach(println)
    
    // 计算每个顶点的度: 入度 + 出度
    val degrees = graph.degrees
    
    // 对图中所有边关系进行翻转
    val reverseGraph = graph.reverse
    
    // 通过subgraph对图进行过滤获得新的图
    println("通过subgraph对图进行过滤获得新的图")
    val subGraph = graph.subgraph(
      vpred = (id, attr) => attr._2 == "prof"
    )
    subGraph.triplets.foreach(println)
    
    // groupEdges 对相同起始终止的边进行合并
    println("groupEdges 对相同起始终止的边进行合并")
    val grouped = graph.groupEdges(merge = (a, b) => a + b)
    grouped.edges.collect().foreach(println)
    
    // mapVertices 更改顶点属性
    println("通过 mapVertices 更改顶点属性")
    graph.mapVertices((id, attr) => (attr._1, if (attr._2=="student") "student" else "teacher"))
      .vertices
      .foreach(println)
    
    // mapEdges 更改顶点属性
    println("通过 mapEdges 更改顶点属性")
    graph.mapEdges(edge => if (edge.attr == "pi") "friend" else edge.attr)
      .edges.foreach(println)
      }
    }
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
  • 计算图中每一个顶点的入度和出度情况
    • 对图进行一些过滤生成新的图
    • 更改图中的属性
    • 对图中重复边进行聚合
复制代码
    博士后节点的数量为: 1
    途中边起始ID大于终止ID的数量为: 1
    ((5,(wangwu,prof)),(7,(zhaosi,postdoc)),pi)
    ((3,(zhangsan,student)),(7,(zhaosi,postdoc)),collab)
    ((2,(lisi,prof)),(5,(wangwu,prof)),colleague)
    ((5,(wangwu,prof)),(3,(zhangsan,student)),advisor)
    ((6,(ffzs,Missing)),(7,(zhaosi,postdoc)),pi)
    每个顶点的入度情况:
    (5,1)
    (7,3)
    (3,1)
    每个顶点的出度情况:
    (5,2)
    (6,1)
    (3,1)
    (2,1)
    通过subgraph对图进行过滤获得新的图
    ((2,(lisi,prof)),(5,(wangwu,prof)),colleague)
    groupEdges 对相同起始终止的边进行合并
    Edge(3,7,collab)
    Edge(5,3,advisor)
    Edge(2,5,colleague)
    Edge(5,7,pi)
    Edge(6,7,pi)
    通过 mapVertices 更改顶点属性
    (7,(zhaosi,teacher))
    (2,(lisi,teacher))
    (6,(ffzs,teacher))
    (3,(zhangsan,student))
    (5,(wangwu,teacher))
    通过 mapEdges 更改顶点属性
    Edge(5,7,friend)
    Edge(5,3,advisor)
    Edge(3,7,collab)
    Edge(2,5,colleague)
    Edge(6,7,friend)
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

全部评论 (0)

还没有任何评论哟~