Advertisement

【Spark】(十五)基于Spark GraphX的图形数据分析

阅读量:

为什么需要图计算

许多大数据以大规模图或网络的形式呈现
许多非图结构的大数据,常会被转换为图模型进行分析
图数据结构很好地表达了数据之间的关联性

图(Graph)的基本概念

  • 1、图是由顶点集合(Vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构
    通常表示为二元组:Graph=(V,E)
    可以对事物之间的关系建模

  • 2、应用场景:
    在地图应用中寻找最短路径
    社交网络关系
    网页间超链接关系

图的术语

1、顶点(Vertex)
2、边(Edge)

在这里插入图片描述
在这里插入图片描述
3、有向图
在这里插入图片描述
在这里插入图片描述

4、无向图
在这里插入图片描述
在这里插入图片描述
5、有环图
包含一系列顶点连接的回路(环路)
在这里插入图片描述
6、无环图
DAG即为有向无环图
在这里插入图片描述

7、度:一个顶点所有边的数量
出度:指从当前顶点指向其他顶点的边的数量
入度:其他顶点指向当前顶点的边的数量

图的经典表示法

邻接矩阵

  • 1、对于每条边,矩阵中相应单元格值为1
  • 2、对于每个循环,矩阵中相应单元格值为2,方便在行或列上求得顶点度
    在这里插入图片描述

Spark GraphX

一、简介

GraphX是Spark提供分布式图计算API

GraphX特点 :

  • 1、基于内存实现了数据的复用与快速读取
  • 2、通过弹性分布式属性图(Property Graph)统一了图视图与表视图
  • 3、与Spark Streaming、Spark SQL和Spark MLlib等无缝衔接

二、GraphX核心抽象

弹性分布式属性图(Resilient Distributed Property Graph)

  • 1、顶点和边都带属性的有向多重图
    在这里插入图片描述

  • 2、一份物理存储,两种视图
    在这里插入图片描述
    注:对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成

三、GraphX API

Graph[VD,ED]
VertexRDD[VD]
EdgeRDD[ED]
EdgeTriplet[VD,ED]
Edge:样例类
VertexId:Long的别名

图(graph)函数:

复制代码
    aggregateMessages         getCheckpointFiles   ops                    staticPageRank
    cache                     groupEdges           outDegrees             staticParallelPersonalizedPageRank
    checkpoint                inDegrees            outerJoinVertices      staticPersonalizedPageRank
    collectEdges              isCheckpointed       pageRank               stronglyConnectedComponents
    collectNeighborIds        joinVertices         partitionBy            subgraph
    collectNeighbors          mapEdges             persist                triangleCount
    connectedComponents       mapTriplets          personalizedPageRank   triplets
    convertToCanonicalEdges   mapVertices          pickRandomVertex       unpersist
    degrees                   mask                 pregel                 unpersistVertices
    edges                     numEdges             removeSelfEdges        vertices
    filter                    numVertices          reverse
    
    
      
      
      
      
      
      
      
      
      
      
      
    

在这里插入图片描述
在这里插入图片描述
案例一:

复制代码
    // 导入Spark Graph包
    import org.apache.spark.graphx._
    // 创建 vertices 顶点RDD
    val vertices = sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))
    // 创建 edges 边RDD 
    val edges = sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))
    // 创建 graph 对象
    val graph = Graph(vertices,edges)
    
    
      
      
      
      
      
      
      
      
    
在这里插入图片描述
复制代码
    // 获取graph图对象的vertices信息
    graph.vertices.collect
    // 获取graph图对象的edges信息
    graph.edges.collect
    // 获取graph图对象的triplets信息
    graph.triplets.collect
    
    
      
      
      
      
      
      
    

在这里插入图片描述
案例二:

复制代码
    cd /opt/kb09file/
    vi followers.txt
    cat followers.txt
    
    
      
      
      
    
复制代码
    import org.apache.spark.graphx.GraphLoader
    //加载边列表文件创建图,文件每行描述一条边,格式:srcId dstId。顶点与边的属性均为1
    val graphLoad = GraphLoader.edgeListFile(sc, "file:///opt/kb09file/followers.txt")
    // 获取graph图对象的vertices信息
    graphLoad.vertices.collect
    // 获取graph图对象的edges信息
    graphLoad.edges.collect
    // 获取graph图对象的triplets信息
    graphLoad.triplets.collect
    
    
      
      
      
      
      
      
      
      
      
    
在这里插入图片描述

四、属性图应用

示例-1
构建用户合作关系属性图

  • 1、顶点属性
    用户名
    职业

  • 2、边属性
    合作关系
    在这里插入图片描述

复制代码
    scala> val users = sc.makeRDD(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","professor")),(2L,("istoica","professor"))))
    
    scala> val relationship = sc.makeRDD(Array(Edge(3L,7L,"Collaborator"),Edge(5L,3L,"Advisor"),Edge(2L,5L,"Colleague"),Edge(5L,7L,"PI")))
    
    scala> val graphUser = Graph(users,relationship)
    
    
      
      
      
      
      
    
在这里插入图片描述
复制代码
    // 获取graph图对象的vertices信息
    scala> graphUser.vertices.collect
    // 获取graph图对象的edges信息
    scala> graphUser.edges.collect
    // 获取graph图对象的triplets信息
    scala> graphUser.triplets.collect
    
    
      
      
      
      
      
      
    

在这里插入图片描述
示例-2

  • 构建用户社交网络关系
    顶点:用户名、年龄
    边:打call次数

  • 找出大于30岁的用户

  • 假设打call超过5次,表示真爱。请找出他(她)们
    在这里插入图片描述

复制代码
    val userRDD = sc.makeRDD(Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50))))
    
    val usercallRDD = sc.makeRDD(Array(Edge(2L,1L,7),Edge(4L,1L,1),Edge(2L,4L,2),Edge(3L,2L,4),Edge(5L,2L,2),Edge(5L,3L,8),Edge(3L,6L,3),Edge(5L,6L,3)))
    
    val userCallGraph = Graph(userRDD,usercallRDD)
    
    
      
      
      
      
      
    
在这里插入图片描述
复制代码
    userCallGraph.vertices.collect
    userCallGraph.edges.collect
    userCallGraph.triplets.collect
    
    
      
      
      
    
在这里插入图片描述
复制代码
    // 找出大于30岁的用户
    //方式一
    userCallGraph.vertices.filter(v => v._2._2 > 30).collect.foreach(println)
    // 等同于:
    userCallGraph.vertices.filter(v => v._2._2 > 30).collect.foreach(x => {println("name:"+x._2._1+" age:"+x._2._2)})
    
    // 方式二
    userCallGraph.vertices.filter{ case (id,(name,age)) => age>30 }.collect.foreach(println)
    
    
      
      
      
      
      
      
      
      
    

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

复制代码
    scala> userCallGraph.triplets.collect.foreach(x => println(x.dstAttr))
    
    scala> userCallGraph.triplets.collect.foreach(x => println(x.srcAttr))
    
    // 取出triplets中的名字和联系
    scala> userCallGraph.triplets.collect.foreach(x => println(x.srcAttr._1+" like "+x.dstAttr._1+" stage: "+x.attr))
    
    
      
      
      
      
      
      
    

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

复制代码
    // 假设打call超过5次,表示真爱。请找出他(她)们
    scala> userCallGraph.triplets.filter(x => x.attr>5).collect.foreach(x => println(x.srcAttr._1+" like "+x.dstAttr._1+" stage: "+x.attr))
    
    
      
      
    
在这里插入图片描述

五、查看图信息

顶点数量

复制代码
    scala> userCallGraph.numVertices
    res35: Long = 6
    
    
      
      
    

在这里插入图片描述
边数量

复制代码
    scala> userCallGraph.numEdges
    res38: Long = 8
    
    
      
      
    

在这里插入图片描述
度、入度、出度

复制代码
    // 各顶点度的数量
    scala> userCallGraph.degrees.collect.foreach(println)
    
    // 各顶点入度数量
    scala> userCallGraph.inDegrees.collect.foreach(println)
    
    // 各顶点出度数量
    scala> userCallGraph.outDegrees.collect.foreach(println)
    
    
      
      
      
      
      
      
      
      
    

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

图的算子

在这里插入图片描述

1、基本算子

numEdges 边数量
numVertices 顶点数量
inDegrees: VertexRDD 入度
outDegrees: VertexRDD 出度
degrees: VertexRDD 度

2、属性算子:类似于RDD的map操作

在这里插入图片描述
这里每一个操作产生一个新图,其顶点和边被用户定义的map函数修改了

mapVertices 变换结构

功能是transform each vertex attribute in the graph using the map function。即对已有图的顶点属性做转换

复制代码
    scala> val t1_graph = userCallGraph.mapVertices{case(vertextId,(name,age)) => (vertextId,name)}
    scala> t1_graph.vertices.collect.foreach(println)
    (上下两条语句效果一致)
    scala> val t2_graph = userCallGraph.mapVertices((id,attr) => (id,attr._1))
    scala> t2_graph.vertices.collect.foreach(println)
    
    
      
      
      
      
      
    

在这里插入图片描述
在这里插入图片描述
mapEdges 对已有图的边属性做转换

复制代码
    scala> val t3_graph = userCallGraph.mapEdges(e => Edge(e.srcId,e.dstId,e.attr*7.0))
    scala> t3_graph.vertices.collect.foreach(println)
    
    
      
      
    
在这里插入图片描述

3、结构算子

在这里插入图片描述
reverse :作用就是把edge的方向反过来,在这里就是把每个人的关系反过来一下

复制代码
    scala> val reverseUserCallGraph = userCallGraph.reverse
    scala> reverseUserCallGraph.triplets.collect.foreach(println)
    
    
      
      
    

在这里插入图片描述
subgraph: 根据条件过滤掉一部分数据,生成满足顶点与边的条件的子图

复制代码
    scala> userCallGraph.subgraph(vpred = (id,attr) => attr._2<65).triplets.collect.foreach(println)
    
    
      
    

注:attr:(name,age)
在这里插入图片描述

复制代码
    scala> userCallGraph.subgraph(vpred = (id,attr) => {  println("subgrap in",id,attr); attr._2<65}).triplets.collect.foreach(println)
    
    
      
    
在这里插入图片描述
复制代码
    scala> userCallGraph.subgraph(epred=(ep) =>ep.srcAttr._2<65).triplets.collect.foreach(println)
    
    
      
    
在这里插入图片描述

4、Join算子:从外部的RDDs加载数据,修改顶点属性

在这里插入图片描述
joinVertices:
Graph的joinVertices运算符与输入RDD顶点进行连接并返回通过应用用户定义获得的顶点属性的新图形。RDD中没有匹配值的顶点保留其原始值。

复制代码
    scala> val two = sc.makeRDD(Array((1L,"kgc.cn"),(2L,"qq.com"),(3L,"163.com")))
    
    scala> userCallGraph.joinVertices(two)((id,v,cmpy) => (v._1 +"@"+cmpy,v._2))
    res26: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@6e5c083d
    
    scala> userCallGraph.joinVertices(two)((id,v,cmpy) => (v._1+"@"+cmpy,v._2)).triplets.collect.foreach(println)
    
    
      
      
      
      
      
      
    

在这里插入图片描述
outerJoinVertices:
注:RDD中的顶点不匹配时,值为None。

复制代码
    scala> val two = sc.makeRDD(Array((1L,"kgc.cn"),(2L,"qq.com"),(3L,"163.com")))
    
    scala> userCallGraph.outerJoinVertices(two)((id,v,cmpy) => (v._1 +"@"+cmpy,v._2))
    res49: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@6a35e84a
    
    scala> userCallGraph.outerJoinVertices(two)((id,v,cmpy) => (v._1 +"@"+cmpy,v._2)).triplets.collect.foreach(println)
    
    
      
      
      
      
      
      
    
在这里插入图片描述

GraphX API 应用

案例:计算用户粉丝数量(顶点的入度即为粉丝数量)

复制代码
    // 定义样例类
    scala> case class User(name:String,age:Int,inDeg:Int,outDeg:Int)
    // 将顶点入度存入顶点属性中
    scala> userCallGraph.outerJoinVertices(userCallGraph.inDegrees){ case(id,u,indeg) => User(u._1,u._2,indeg.getOrElse(0),0)}
    
    scala> res30.vertices.collect.foreach(println)
    
    
      
      
      
      
      
      
    
在这里插入图片描述
复制代码
    // 将顶点入度、出度存入顶点属性中
    scala> userCallGraph.outerJoinVertices(userCallGraph.inDegrees){ case(id,u,indeg) => User(u._1,u._2,indeg.getOrElse(0),0)}.outerJoinVertices(userCallGraph.outDegrees){case(id,u,outdeg)=>User(u.name,u.age,u.inDeg,outdeg.getOrElse(0))}
    
    scala> res35.vertices.collect.foreach(println)
    
    
      
      
      
      
    
在这里插入图片描述

全部评论 (0)

还没有任何评论哟~