Advertisement

2020.11.23课堂笔记(基于Spark GraphX的图形数据分析)

阅读量:

理解图(Graph)的基本概念与术语

一种网状数据结构由节点集合(node)及节点间的关系集合(关联)构成于

图的术语

顶点(Vertex)
边(Edge)
通常表示为二元组:Graph=(V,E)
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}

在这里插入图片描述

有向图
G=(V,E)
V={A,B,C,D,E}
E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>}

在这里插入图片描述

无向图
G=(V,E)
V={A,B,C,D,E}
E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}

在这里插入图片描述

有环图
包含一系列顶点连接的回路(环路)

在这里插入图片描述

无环图
DAG即为有向无环图

在这里插入图片描述

该图论中的被定义为一个特定项节点连接的所有边的数量之总和。具体而言,
出度被定义为从该节点出发指向其他任意项节点的有向关系数量,
而入度过量则表示自他不同项节点引入到该特定项节点的所有有向关系数量之总和。

在这里插入图片描述

图的经典表示法

邻接矩阵

在这里插入图片描述

1、在每一行里,在对应的单元格的位置上设置数值为1
2、在每一列里,在对应的单元格的位置上设置数值为2;这样做可以让通过该行或者该列的数据来快速计算出顶点度数

应用场景

在地图应用中寻找最短路径
社交网络关系
网页间超链接关系

理解Spark GraphX 数据模型

Spark GraphX 简介

GraphX是Spark提供的分布式图计算API工具
其主要特点包括:
基于内存实现了数据的共享复用以及高效的批量读取功能;
通过弹性扩展的分布式属性图(Property Graph)实现了异构数据视图的一体化管理;
与其相关的主流组件如Streaming、SQL以及ML库等实现了无缝集成。

GraphX核心抽象

具有弹性的分布式属性数据结构(Resilient Distributed Property Graph)在此模型中设计时,在其基础架构上实现了对动态变化的有效适应能力。该结构中每个节点和关系都携带对应的属性信息。

在这里插入图片描述
在这里插入图片描述

一份物理存储,两种视图

在这里插入图片描述

所有Graph视图的操作都会被转译为其关联的Table视图的RDD操作

掌握Spark GraphX API

Graph实例(V,D)

复制代码
    // 1.导入spark graphx包
    scala> import org.apache.spark.graphx._
    // 2.创建 vertices 顶点rdd
    scala> val vertices=sc.makeRDD(Seq( (1L,1),(2L,2),(3L,3) ))
    vertices: org.apache.spark.rdd.RDD[(Long, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:27
    // 3.创建edges  边rdd
    scala> val edges=sc.makeRDD(Seq( Edge(1L,2L,1),Edge(2L,3L,2) ))
    edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = ParallelCollectionRDD[1] at makeRDD at <console>:27
    // 4.创建 graph 对象
    scala> val graph = Graph(vertices,edges)
    graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@6a2d066d

获取graph对象后可以查看的信息:

复制代码
    // 获取graph图对象的 vertices 点信息
    scala> graph.vertices.collect
    res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,1), (3,3), (2,2))
    // 获取graph图对象的 edges 边信息
    scala> graph.edges.collect
    res1: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1,2,1), Edge(2,3,2))
    // 获取graph图对象的 triplets 信息 会把顶点的全信息拉下来
    scala> graph.triplets.collect
    res2: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((1,1),(2,2),1), ((2,2),(3,3),2))

使用idea编写程序:需要在maven项目中添加依赖:

复制代码
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-graphx_2.11</artifactId>
    <version>2.1.1</version>
    </dependency>
复制代码
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    object GraphX01 {
      def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("graphx")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    import org.apache.spark.graphx._
    val vertices: RDD[(VertexId, PartitionID)] = sc.makeRDD(Seq( (1L,1),(2L,2),(3L,3) ))
    val edges: RDD[Edge[PartitionID]] = sc.makeRDD(Seq( Edge(1L,2L,1),Edge(2L,3L,2) ))
    val graph: Graph[PartitionID, PartitionID] = Graph(vertices,edges)
    graph.edges.collect().foreach(println)
    /*Edge(1,2,1)
    Edge(2,3,2)*/
    graph.vertices.collect().foreach(println)
    /*(1,1)
    (2,2)
    (3,3)*/
    graph.triplets.collect().foreach(println)
    /*((1,1),(2,2),1)
    ((2,2),(3,3),2)*/
      }
    }

使用加载文件的方式创建graph对象

复制代码
    [root@hadoop100 kb09file]# vi followers.txt
    [root@hadoop100 kb09file]# cat followers.txt
    2 3
    3 4
    1 4
    2 4
    //加载边列表文件创建图,文件每行描述一条边,格式:srcId dstId。顶点与边的属性均为1
    scala> val graphLoad = GraphLoader.edgeListFile(sc,"file:///opt/kb09file/followers.txt")
    grathLoad: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@27870c76
    //顶点信息
    scala> graphLoad.vertices.collect
    res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,1), (3,1), (2,1))
    //边信息
    scala> graphLoad.edges.collect
    res6: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1,4,1), Edge(2,3,1), Edge(2,4,1), Edge(3,4,1))
    //triplets信息
    scala> graphLoad.triplets.collect
    res7: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((1,1),(4,1),1), ((2,1),(3,1),1), ((2,1),(4,1),1), ((3,1),(4,1),1))

在IDEA中的实现:

复制代码
    import org.apache.spark.{SparkConf, SparkContext}
    
    object GraphX02 {
      def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("graphx")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    import org.apache.spark.graphx._
    val graph: Graph[PartitionID, PartitionID] = GraphLoader.edgeListFile(sc,"in/followers.txt")
    graph.edges.collect().foreach(println)
    /*Edge(2,3,1)
    Edge(3,4,1)
    Edge(1,4,1)
    Edge(2,4,1)*/
    graph.vertices.collect().foreach(println)
    /*(4,1)
    (2,1)
    (1,1)
    (3,1)*/
    graph.triplets.collect().foreach(println)
    /*((2,1),(3,1),1)
    ((3,1),(4,1),1)
    ((1,1),(4,1),1)
    ((2,1),(4,1),1)*/
      }
    }

案例:

在这里插入图片描述

构建用户合作关系属性图
顶点属性:用户名,职业
边属性:合作关系

复制代码
    // 1.创建 vertices 顶点rdd
    scala> val users = sc.parallelize(Array( (3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","professor")),(2L,("istoica","professor"))  ))
    users: org.apache.spark.rdd.RDD[(Long, (String, String))] = ParallelCollectionRDD[62] at parallelize at <console>:27
    // 2.创建 edges 边rdd
    scala> val relationship = sc.parallelize(Array( Edge(3L,7L,"Colla"),Edge(5L,3L,"Advisor"),Edge(2L,5L,"Colleague"),Edge(5L,7L,"PI")  ))
    relationship: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[63] at parallelize at <console>:27
    // 3.创建 graph 对象
    scala> val graphUser= Graph(users,relationship)
    graphUser: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@57f1635f

查看graph对象的相关信息:

复制代码
    //顶点信息
    scala> graphUser.vertices.collect
    res8: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((3,(rxin,student)), (7,(jgonzal,postdoc)), (5,(franklin,professor)), (2,(istoica,professor)))
    //边信息
    scala> graphUser.edges.collect
    res9: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(2,5,Colleague), Edge(3,7,Colla), Edge(5,3,Advisor), Edge(5,7,PI))
    //triplets信息
    scala> graphUser.triplets.collect
    res10: Array[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = Array(((2,(istoica,professor)),(5,(franklin,professor)),Colleague), ((3,(rxin,student)),(7,(jgonzal,postdoc)),Colla), ((5,(franklin,professor)),(3,(rxin,student)),Advisor), ((5,(franklin,professor)),(7,(jgonzal,postdoc)),PI))

案例2:
搭建用户社交网络架构
顶点:账号、年龄
边:互动频率
筛选活跃度较高的账号

在这里插入图片描述

假设打call超过5次,表示真爱。请找出他(她)们

在这里插入图片描述
复制代码
    // 1.创建 vertices 顶点rdd
    scala> val userRdd = sc.parallelize(Array( (1L,("alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50))  ))
    // 2.创建 edges 边rdd
    scala> val usercallRdd = sc.parallelize(Array( Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(3L,6L,3),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3) ))
    // 3.创建 graph 对象
    scala> val userCallGraph= Graph(userRdd,usercallRdd)
    userCallGraph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@1a9732d1

过滤出年龄大于30的用户:

复制代码
    scala> userCallGraph.vertices.filter(v=>v._2._2>30).collect.foreach(println)
    (4,(David,42))
    (6,(Fran,50))
    (3,(Charlie,65))
    (5,(Ed,55))
    scala> userCallGraph.vertices.filter(v=>v._2._2>30).collect.foreach(x=>println("name:"+x._2._1+" age:"+x._2._2))
    name:David age:42
    name:Fran age:50
    name:Charlie age:65
    name:Ed age:55
    scala> userCallGraph.vertices.filter{ case (id,(name,age)) => age>30 }.collect.foreach(println)
    (4,(David,42))
    (6,(Fran,50))
    (3,(Charlie,65))
    (5,(Ed,55))

假设打call超过5次,表示真爱。请找出他(她)们:

复制代码
    scala> userCallGraph.triplets
    res19: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = MapPartitionsRDD[113] at mapPartitions at GraphImpl.scala:48
    
    scala> userCallGraph.triplets.collect.foreach(println)
    ((2,(Bob,27)),(1,(alice,28)),7)
    ((2,(Bob,27)),(4,(David,42)),2)
    ((3,(Charlie,65)),(2,(Bob,27)),4)
    ((3,(Charlie,65)),(6,(Fran,50)),3)
    ((4,(David,42)),(1,(alice,28)),1)
    ((5,(Ed,55)),(2,(Bob,27)),2)
    ((5,(Ed,55)),(3,(Charlie,65)),8)
    ((5,(Ed,55)),(6,(Fran,50)),3)
    //统计打call的次数
    scala> userCallGraph.triplets.collect.foreach(x=>println(x.srcAttr._1 +"喜欢"+ x.dstAttr._1 + "喜欢指数:" + x.attr) )
    Bob喜欢alice喜欢指数:7
    Bob喜欢David喜欢指数:2
    Charlie喜欢Bob喜欢指数:4
    Charlie喜欢Fran喜欢指数:3
    David喜欢alice喜欢指数:1
    Ed喜欢Bob喜欢指数:2
    Ed喜欢Charlie喜欢指数:8
    Ed喜欢Fran喜欢指数:3
    //符合条件打call次数大于5次的
    scala> userCallGraph.triplets.filter(x=>x.attr>5).collect.foreach(x=>println(x.srcAttr._1 +"喜欢"+ x.dstAttr._1 + "喜欢指数:" + x.attr) )
    Bob喜欢alice喜欢指数:7
    Ed喜欢Charlie喜欢指数:8

在IDEA中实现:

复制代码
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object GraphX03 {
      def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("graphx")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    import org.apache.spark.graphx._
    val userRdd: RDD[(VertexId, (String, PartitionID))] = sc.parallelize(
      Array( (1L,("alice",28))
        ,(2L,("Bob",27))
        ,(3L,("Charlie",65))
        ,(4L,("David",42))
        ,(5L,("Ed",55))
        ,(6L,("Fran",50))
      ))
    val usercallRdd: RDD[Edge[PartitionID]] = sc.parallelize(
      Array(
        Edge(2L,1L,7)
        ,Edge(2L,4L,2)
        ,Edge(3L,2L,4)
        ,Edge(3L,6L,3)
        ,Edge(4L,1L,1)
        ,Edge(5L,2L,2)
        ,Edge(5L,3L,8)
        ,Edge(5L,6L,3)
      ))
    val userCallGraph: Graph[(String, PartitionID), PartitionID] = Graph(userRdd,usercallRdd)
    
    userCallGraph.vertices.filter(x=>x._2._2>30).collect().foreach(x=>println("name: " + x._2._1 +",age: "+x._2._2))
    /*name: Charlie,age: 65
    name: David,age: 42
    name: Ed,age: 55
    name: Fran,age: 50*/
    userCallGraph.vertices.filter{ case (id,(name,age)) => age>30 }.collect().foreach(println)
    /*(3,(Charlie,65))
    (4,(David,42))
    (5,(Ed,55))
    (6,(Fran,50))*/
    userCallGraph.triplets.collect().foreach(println) //RDD[EdgeTriplet[(String, PartitionID), PartitionID]]
    /*((2,(Bob,27)),(1,(alice,28)),7)
    ((2,(Bob,27)),(4,(David,42)),2)
    ((3,(Charlie,65)),(2,(Bob,27)),4)
    ((3,(Charlie,65)),(6,(Fran,50)),3)
    ((4,(David,42)),(1,(alice,28)),1)
    ((5,(Ed,55)),(2,(Bob,27)),2)
    ((5,(Ed,55)),(3,(Charlie,65)),8)
    ((5,(Ed,55)),(6,(Fran,50)),3)*/
    userCallGraph.triplets.filter(x=>x.attr>5).collect.foreach(x=>println(x.srcAttr._1 +"喜欢"+ x.dstAttr._1 + "喜欢指数:" + x.attr) )
    /*Bob喜欢alice喜欢指数:7
    Ed喜欢Charlie喜欢指数:8*/
    userCallGraph.edges.filter{case Edge(src,dst,attr) => attr>5}.collect().foreach(println)
    /*Edge(2,1,7)
    Edge(5,3,8)*/
      }
    }

掌握Spark GraphX 图算法

PageRank

Connected Component

掌握Spark GraphX Pregel

在课堂练习中,请确定哪些网络红人?需求说明如下:
数据文件为 twitter-graph-data.txt。
格式定义如下:每条记录由两个元组构成( username, userId ),其中前者代表被关注者( followee ),后者代表关注者( follower )。
请构建该社交网络图谱,并计算每位用户的关注人数。
最后,请识别出最受欢迎的网络红人。

复制代码
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User42,197134784))
    ((User89,74286565),(User49,19315174))
    ((User16,22679419),(User69,45705189))
    ((User37,14559570),(User64,24742040))
    ((User31,63644892),(User10,123004655))
    ((User10,123004655),(User50,17613979))
    ((User37,14559570),(User11,14269220))
    ((User78,3365291),(User30,93905958))
    ((User14,199097645),(User60,16547411))
    ((User3,14874480),(User42,197134784))
    ((User40,813286),(User9,15434432))
    ((User10,123004655),(User34,10211502))
    ((User90,34870269),(User53,25566593))
    ((User12,24741956),(User60,16547411))
    ((User12,24741956),(User5,18927441))
    ((User37,14559570),(User39,22831657))
    ((User89,74286565),(User32,15670515))
    ((User28,7465732),(User65,90569268))
    ((User89,74286565),(User9,15434432))
    ------仅展示20条数据文件----------------

代码实现,这里使用到了正则匹配

复制代码
    object Demo08 {
      def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getName)
      .master("local[4]")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("ERROR")
    import org.apache.spark.graphx._
    // \(\(User47,86566510\),\(User83,15647839\)\)
    // 编写正则表达式,用于提取字段:
    // \(\((User\d+,\d+)\),\((User\d+,\d+)\)\)
    
    // 正则表达式模式匹配后返回两组值,分别对这两组值进行切割处理
    val r: Regex = """\(\((User\d+,\d+)\),\((User\d+,\d+)\)\)""".r
    // 读取数据文件
    val twitters: RDD[(Option[String], Option[String])] = sc.textFile("data/twitter_graph_data.txt")
      .map(x => x match {
        case r(followee, follower) => (Some(followee), Some(follower))
        case _ => (None, None)
      }).filter(x => x._1 != None && x._2 != None)
      // flatMap对数据进行扁平化处理
      // 也可以使用union对两个RDD求并集,这里不做演示
    val verts: RDD[(VertexId, String)] = twitters.flatMap(x => {
      val str1: Array[String] = x._1.get.split(",")
      val str2: Array[String] = x._2.get.split(",")
      Array((str1(1).toLong, str1(0)), (str2(1).toLong, str2(0)))
    }).distinct()
    val edges: RDD[Edge[String]] = twitters.map(x => {
      val str1: Array[String] = x._1.get.split(",")
      val str2: Array[String] = x._2.get.split(",")
      Edge(str2(1).toLong, str1(1).toLong,"follow")
    })
    val graph: Graph[String, String] = Graph(verts,edges,"")
    // 求入度,按照入度的值进行倒序排列
    // 注意:要想实现全局有序,需要重新分成一个分区
    graph.inDegrees.repartition(1).sortByKey().foreach(println)
      }
    }

全部评论 (0)

还没有任何评论哟~