Spark-Graphx系列-基础用法
GraphX是一个基于Spark的并行图计算框架,它利用图中的集合算法和迭代图计算功能高效地处理大规模图数据。其核心在于将抽象的图计算逻辑与高效的Spark执行机制相结合,在GraphX中定义了顶点类Tag和边类Tag,并实现了多种图算法的基本操作。
一.简介
spark graphx 是一种并行化的图计算框架, 基于 spark 平台提供了丰富且易于使用的接口, 大大地简化了对饼形图的处理需求. 在 graphx 中, 图的核心数据结构类由边集 RDD 和顶点集 RDD 组成.
相较于现有的其他图处理系统及图数据库而言,在GraphX中构建其核心优势体系时
二.特点
灵活性高
图形实现了与集合之间的无缝集成。该框架实现了单个系统内的ETL流程、探索性分析以及迭代式图计算功能。您可访问与图形数据及集合数据具有同等价值的数据库。通过有效的RDD转换操作以及利用Pregel API实现自定义迭代式图算法。
速度快
可以与最快的专业图形处理系统相媲美。
GraphX在性能上与当前最领先的图形处理系统展开较量,并在实现高效数据处理的同时成功继承了Spark系统的灵活配置性、完善的容错机制以及简便易用性的特点。
算法多
从不断增加的图算法库中进行选择。
基于高度灵活的API框架...GraphX不仅提供了丰富的图形算法库...其中一些是由我们的用户贡献开发的...它不仅支持经典的网页排名算法(如PageRank)、连通组件分析工具(如Strongly Connected Components)、标签传播模型(如Label Propagation Algorithm)以及SVD++分解技术(Singular Value Decomposition++),同时也具备强大的连通组件分析工具和三角形计数算法等核心功能。
三.案例
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("GraphxDemo10").master("local[*]").getOrCreate()
val sc = session.sparkContext
//创建顶点数据集
val vertexRDD:RDD[(VertexId,(String,String))] = sc.makeRDD(Array(
(3L,("zhangsan","student")),
(7L,("wangchen","博士后")),
(5L,("zhangyu","教授")),
(2L,("wangguo","教授"))
))
//创建边的数据
val edgesRDD:RDD[Edge[String]] = sc.makeRDD(Array(
Edge(3L,7L,"合作者"),
Edge(5L,3L,"指导"),
Edge(2L,5L,"同事"),
Edge(5L,7L,"同事")
))
//构建一个图
val graphx = Graph(vertexRDD,edgesRDD)
//RDD展示
val result = graphx.triplets.collect()
result.foreach( triplet =>
println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} ")
)
session.stop()
}
结果
src ID为3的srcAttr为(zhangsan, student),与dst ID为7的dstAttr为(wangchen, 博士后)之间存在一种关系——边
srcId=5 srcAttr=(zhangyu,教授)–edge=指导–dstId=3 dstAttr=(zhangsan,student)
srcId=2 srcAttr=(wangguo,教授)–edge=同事–dstId=5 dstAttr=(zhangyu,教授)
srcId=5 srcAttr=(zhangyu,教授)–edge=同事–dstId=7 dstAttr=(wangchen,博士后)
四.创建
def apply[VD: ClassTag, ED: ClassTag]
根据边直接创建, 所有顶点的属性都一样为 defaultValue
def fromEdges[VD: ClassTag, ED: ClassTag]
def fromEdgeTuples[VD: ClassTag]
五.转换操作
5.1 基本信息
获取边的数量
val numEdges: Long
获取顶点的数量
val numVertices: Long
获取所有顶点的入度
val inDegrees: VertexRDD[Int]
获取所有顶点的出度
val outDegrees: VertexRDD[Int]
获取所有顶点入度与出度之和
val degrees: VertexRDD[Int]
查看所有边
val edges: EdgeRDD[ED]
所有顶点属性
val vertices: VertexRDD[VD]
边和顶点属性(包含id)
val triplets: RDD[EdgeTriplet[VD, ED]]
5.2 转换操作
在GraphX框架中,转换操作主要包括三个关键组件:mapVertices、mapEdges以及mapTriplets。这些组件由Graph文件定义,并由GraphImpl负责实现。
5.2.1 mapVertices
该函数用于更新图中各顶点的属性信息。根据前面章节的内容可知,在图结构中,每个顶点都与对应的边分区相关联。因此我们需要修改的是这些关联到各 vertex 的 edge partitions 中的 attribute 信息。对该 graph 的所有 vertex 逐一应用 map 操作,在该操作过程中, 所有被处理 vertex 的 ID 均保持不变, 可通过此映射过程将各 vertex 的相关数据类型转换为所需的形式。
graphx.mapVertices((id,attr)=>attr._1)
.triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
srcId=3 srcAttr=zhangsan–edge=合作者–dstId=7 dstAttr=wangchen
srcId=5 srcAttr=zhangyu–edge=指导–dstId=3 dstAttr=zhangsan
srcId=2 srcAttr=wangguo–edge=同事–dstId=5 dstAttr=zhangyu
srcId=5 srcAttr=zhangyu–edge=同事–dstId=7 dstAttr=wangchen
5.2.2 mapEdges
执行映射操作于图中的每一条边,并规定各边的方向不允许修改;同时允许将其属性转换为另一种类型。
graphx.mapEdges(attr=>{
1
}) .triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
srcId=3 srcAttr=(zhangsan,student)–edge=1–dstId=7 dstAttr=(wangchen,博士后)
srcId=5 srcAttr=(zhangyu,教授)–edge=1–dstId=3 dstAttr=(zhangsan,student)
srcId=2 srcAttr=(wangguo,教授)–edge=1–dstId=5 dstAttr=(zhangyu,教授)
srcId=5 srcAttr=(zhangyu,教授)–edge=1–dstId=7 dstAttr=(wangchen,博士后)
5.2.3 mapTriplets
该函数定义了一个名为"functionName"并接收一种特定类型的参数的数据结构。该数据结构由EdgeTriplet实例组成,并且该过程无论处理何种情况都将始终遵循相同的逻辑流程。同时确保返回值类型为ED2
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
graphx.mapTriplets(e=>{
s"${e.srcAttr}:${e.dstAttr}:${e.attr}"
}).triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr} edge=${triplet.attr} dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
源节点ID=3对应的源属性信息为[张三的学生];边连接的是[张三的学生]到[王晨的博士后]的合作单位;目标节点ID=7对应的属性信息为[王晨的博士后]。
source ID = 5; source attribute = (张宇, 教授); edge between (张宇, 教授)和(zhangyu,教授):指导; destination ID = 3; destination attribute = (张三, 学生);
src标识为2;该属性包含国家:中国和职称:教授;边属性包含国家:中国、职称:教授,并连接至同属一个机构的同事:张宇教授;dst标识为5
起始节点ID为5;起始属性为(张宇/教授)。边由(张宇/教授)指向(王陈/博士后),并带有从(张宇/教授)至(王陈/博士后)的任务属性:同事关系。终止节点ID为7;终止属性为(王陈/博士后)。
5.3 结构操作
5.3.1 reverse
反向操作会生成一个新的图,并使该图的所有边的方向发生逆转。例如,该方法可用于计算反向PageRank值。由于反向操作并未更改顶点或边属性,并且数量保持不变,在无需移动或复制数据的情况下就可以高效地实现这一功能。
def reverse: Graph[VD, ED]
//正向
graphx.triplets.collect().foreach(triplet=>{
println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr} --edge=${triplet.attr} --dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} ")
})
//反向
graphx.reverse.triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
正向
源ID为3、属性为(张三、学生);通过合作角色;目标ID为7、属性为(王晨、博士后)。
srcId=5 srcAttr=(zhangyu,教授) --edge=指导 --dstId=3 dstAttr=(zhangsan,student)
srcId=2 srcAttr=(wangguo,教授) --edge=同事 --dstId=5 dstAttr=(zhangyu,教授)
srcId=5 srcAttr=(zhangyu,教授) --edge=同事 --dstId=7 dstAttr=(wangchen,博士后)
反向
src节点ID为7的节点与其关联的属性信息为(wangchen, 博士后),通过"合作者"这一连接关系指向另一个节点
srcId=3 srcAttr=(zhangsan,student)–edge=指导–dstId=5 dstAttr=(zhangyu,教授)
srcId=5 srcAttr=(zhangyu,教授)–edge=同事–dstId=2 dstAttr=(wangguo,教授)
srcId=7 srcAttr=(wangchen,博士后)–edge=同事–dstId=5 dstAttr=(zhangyu,教授)
5.3.2 subgraph
基于vertex和edge上的谓词(predicates)运算,subgraph操作生成的结果图仅包含符合vertex谓词条件的所有vertex、符合edge谓词条件的所有edge以及这些vertex之间的关联关系。该操作适用于多种应用场景,在分析中可灵活用于提取具有特定属性的相关部分图或修复网络结构中的断裂连接。
def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
//满足三元组边判断,满足顶点的判定
graphx.subgraph(x=>if(x.attr=="同事") true else false,(id,attr)=>true).triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
srcId=2 srcAttr=(wangguo,教授)–edge=同事–dstId=5 dstAttr=(zhangyu,教授)
srcId=5 srcAttr=(zhangyu,教授)–edge=同事–dstId=7 dstAttr=(wangchen,博士后)
5.3.3 mask
通过mask操作构建相应的子图G',其特性类似于两个集合的交集运算结果。该子图G'仅包括原始输入图中的顶点与边,并通过内联连接(inner join)操作实现其构建过程。该方法能够与subgraph相关的方法协同工作,在特定条件下可结合另一个相关图的信息来限定范围。为了提高效率,在实际应用中我们仅根据节点ID进行匹配比较而不考虑其他属性信息。
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
//原貌
graphx.triplets.collect().foreach(triplet=>{
println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr} --edge=${triplet.attr} --dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} ")
})
//mask后
val validGraph = graphx.subgraph(vpred = (id, attr) => attr._2 != "student")
graphx.mask(validGraph).triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
原貌
src节点ID为3的源节点与其对应的属性信息(张三及其专业的学生)与 合作者 连接至 目标节点ID为7处,并携带其对应的目标属性信息(王陈及其专业的博士后研究人员)。
srcId=5 srcAttr=(zhangyu,教授) --edge=指导 --dstId=3 dstAttr=(zhangsan,student)
srcId=2 srcAttr=(wangguo,教授) --edge=同事 --dstId=5 dstAttr=(zhangyu,教授)
srcId=5 srcAttr=(zhangyu,教授) --edge=同事 --dstId=7 dstAttr=(wangchen,博士后)
mask后
srcId=2 srcAttr=(wangguo,教授)–edge=同事–dstId=5 dstAttr=(zhangyu,教授)
srcId=5 srcAttr=(zhangyu,教授)–edge=同事–dstId=7 dstAttr=(wangchen,博士后)
5.3.4 groupEdges
注意
注意
graphx.groupEdges((e1,e2)=>e1+e2).triplets.collect().foreach(triplet=>println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
原貌
srcId=2 srcAttr=(wangguo,教授) --edge=同事 --dstId=5 dstAttr=(zhangyu,教授)
src_id = 3;src_attr = (张三;学生);边属性为"合作人";dst_id = 7;dst_attr = (王陈;博士后)
srcId=5 srcAttr=(zhangyu,教授) --edge=指导 --dstId=3 dstAttr=(zhangsan,student)
srcId=5 srcAttr=(zhangyu,教授) --edge=同事 --dstId=7 dstAttr=(wangchen,博士后)
srcId=5 srcAttr=(zhangyu,教授) --edge=222 --dstId=7 dstAttr=(wangchen,博士后)
groupEdges后
srcId=2 srcAttr=(wangguo,教授)–edge=同事–dstId=5 dstAttr=(zhangyu,教授)
src标识为3的源属性(zhangsan,学生)与协作对象 dst标识为7的目标属性(wangchen,博士后研究人员)之间建立了关联
srcId=5 srcAttr=(zhangyu,教授)–edge=指导–dstId=3 dstAttr=(zhangsan,student)
srcId=5 srcAttr=(zhangyu,教授)–edge=同事222–dstId=7 dstAttr=(wangchen,博士后)
5.4 聚合
5.4.1 collectNeighbors
该方法的功能是获取每一个顶点所有相连顶点的ID及其属性信息。必须明确方向
EdgeDirection.out:出的方向
EdgeDirection.in:入的方向
EdgeDirection.Either:入边或出边
EdgeDirection.Both:入边且出边
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]: "获取"邻居节点的信息数据"依据"指定的方向"参数展开操作。操作完成后将结果存储于一个顶点对 RDD中。
graphx.collectNeighbors(EdgeDirection.Either).collect().foreach(f=>{
val colls : Array[(VertexId, (String, String))] = f._2
println(s"VertexId:${f._1} ${colls.foreach(print(_))}")
})
5.4.2 collectNeighborIds
该方法的功能是获取每个顶点邻接顶点的ID值。其实现过程与collectNeighbors函数高度相似。为了确保正确性,请明确方向。
def collectNeighborIds(edgeDirection: EdgeDirection): 返回类型为 VertexRDD[Array[VertexId]]
graphx.collectNeighborIds(EdgeDirection.Either).collect().foreach(f=>{
val colls :Array[(VertexId)] = f._2
println(s"VertexId:${f._1} ${colls.foreach(print(_))}")
})
5.4.3 aggregateMessages
val aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit, mergeFunction: (A,A)=>A): VertexRDD[A] = aggregateMessages
每一条边将调用sendMsg方法发送一个消息;每个顶点将调用mergeMsg方法来处理它所接收的所有消息。
TripletFields中存在一个用于配置EdgeContext对象的属性值是否存在?其目的是为了降低数据通信量。
/** * 聚合到尾结点,去在所有起始节点找最大
*/
val graph3Value: VertexRDD[Int] = graph3.aggregateMessages[Int](triplet => {
triplet.sendToDst(triplet.srcAttr._2)
},
(a, b) => math.max(a, b))
graph3Value.collect().foreach(println(_))
结果
原貌
srcId=1 srcAttr=(zhang1,25) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=3 srcAttr=(zhang3,45) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=1 srcAttr=(zhang1,25) --edge=1 --dstId=4 dstAttr=(zhang4,20)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=3 srcAttr=(zhang3,45) --edge=1 --dstId=5 dstAttr=(zhang5,70)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=6 dstAttr=(zhang6,79)
srcId=6 srcAttr=(zhang6,79) --edge=1 --dstId=4 dstAttr=(zhang4,20)
aggregateMessages:
(2,101)
(4,79)
(5,45)
(6,101)
5.5 关联操作
5.5.1 joinVertices
def vertexJoin[U: ClassTag](一个包含RDD类型的参数table:RDD[(VertexId, U)])(
mapFunc: (VertexId, VD, U) => VD
):Graph[VD, ED]
对具有相同顶点ID的数据进行加权处理;将U类别的数据整合至 VD类别中;此操作需确保 VD类别的属性不发生变更。
val vertexRDD4 = sc.makeRDD(Array(
(1L,("test1",61)),
(2L,("test2",62)),
(3L,("test3",63))
))
graph3.joinVertices(vertexRDD4)((v, vd, u) => {
(u._1, u._2)
}).triplets.collect().foreach(triplet=>{
println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr} --edge=${triplet.attr} --dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} ")
})
结果
原貌
srcId=1 srcAttr=(zhang1,25) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=3 srcAttr=(zhang3,45) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=1 srcAttr=(zhang1,25) --edge=1 --dstId=4 dstAttr=(zhang4,20)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=3 srcAttr=(zhang3,45) --edge=1 --dstId=5 dstAttr=(zhang5,70)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=6 dstAttr=(zhang6,79)
srcId=6 srcAttr=(zhang6,79) --edge=1 --dstId=4 dstAttr=(zhang4,20)
joinVertices
srcId=1 srcAttr=(test1,61) --edge=1 --dstId=2 dstAttr=(test2,62)
srcId=3 srcAttr=(test3,63) --edge=1 --dstId=2 dstAttr=(test2,62)
srcId=1 srcAttr=(test1,61) --edge=1 --dstId=4 dstAttr=(zhang4,20)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=2 dstAttr=(test2,62)
srcId=3 srcAttr=(test3,63) --edge=1 --dstId=5 dstAttr=(zhang5,70)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=6 dstAttr=(zhang6,79)
srcId=6 srcAttr=(zhang6,79) --edge=1 --dstId=4 dstAttr=(zhang4,20)
5.5.2 outerJoinVertices
此函数实现了外部顶点连接操作(将其他数据集与当前图中的顶点进行关联),其参数包括其他数据集、映射函数以及显式类型等价关系。
类似于joinVertices的做法,在缺少对应节点的情况下,默认值被设置为None。
val vertexRDD4 = sc.makeRDD(Array(
(1L,("test1",61)),
(2L,("test2",62)),
(3L,("test3",63))
))
graph3.outerJoinVertices(vertexRDD4)((v, vd, u) => {
u.getOrElse(null)
}).triplets.collect().foreach(triplet=>{
println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr} --edge=${triplet.attr} --dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} ")
})
结果
原貌
srcId=1 srcAttr=(zhang1,25) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=3 srcAttr=(zhang3,45) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=1 srcAttr=(zhang1,25) --edge=1 --dstId=4 dstAttr=(zhang4,20)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=2 dstAttr=(zhang2,18)
srcId=3 srcAttr=(zhang3,45) --edge=1 --dstId=5 dstAttr=(zhang5,70)
srcId=7 srcAttr=(zhang7,101) --edge=1 --dstId=6 dstAttr=(zhang6,79)
srcId=6 srcAttr=(zhang6,79) --edge=1 --dstId=4 dstAttr=(zhang4,20)
outerJoinVertices
srcId=1 srcAttr=(test1,61) --edge=1 --dstId=2 dstAttr=(test2,62)
srcId=3 srcAttr=(test3,63) --edge=1 --dstId=2 dstAttr=(test2,62)
srcId=1 srcAttr=(test1,61) --edge=1 --dstId=4 dstAttr=null
srcId=7 srcAttr=null --edge=1 --dstId=2 dstAttr=(test2,62)
srcId=3 srcAttr=(test3,63) --edge=1 --dstId=5 dstAttr=null
srcId=7 srcAttr=null --edge=1 --dstId=6 dstAttr=null
srcId=6 srcAttr=null --edge=1 --dstId=4 dstAttr=null
5.6 Pregel
def pregel[A: ClassTag]
(
initialMsg: A,//图初始化的时候,开始模型计算的时候,所有节点都会收到的一个默认的消息
maxIterations: Int = Int.MaxValue,//最大的迭代次数
activeDirection: EdgeDirection = EdgeDirection.Either)//发送消息的方向
(vprog: (VertexId, VD, A) => VD,//节点调用该消息将聚合后的数据和本节点进行属性的合并
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//激活态的节点调用这个方法发送消息
mergeMsg: (A, A) => A)//如果一个节点收到多条消息,那么就会使用mergeMsg将消息合并为一条消息,如果只接收到一条消息,则不合并
5.6.1 最短路径
单一起点最短路径算法旨在计算任意顶点至起始顶点的最短路径。其核心特征是基于起始顶点逐步向外扩展路径直至抵达目标顶点。
val vertexRDD4:RDD[(VertexId,Int)] = sc.makeRDD(Array(
(1L,0),
(2L,Int.MaxValue),
(3L,Int.MaxValue),
(4L,Int.MaxValue),
(5L,Int.MaxValue),
(6L,Int.MaxValue),
(7L,Int.MaxValue),
(8L,Int.MaxValue),
(9L,Int.MaxValue)
))
val edgesRDD4 = sc.makeRDD(Array(
Edge(1L,2L,6),
Edge(1L,3L,3),
Edge(1L,4L,1),
Edge(3L,2L,2),
Edge(3L,4L,2),
Edge(2L,5L,1),
Edge(5L,4L,6),
Edge(5L,6L,4),
Edge(6L,5L,10),
Edge(5L,7L,3),
Edge(5L,8L,6),
Edge(4L,6L,10),
Edge(6L,7L,2),
Edge(7L,8L,4),
Edge(9L,5L,2),
Edge(9L,8L,3)
))
val graph4 = Graph(vertexRDD4,edgesRDD4)
/** * 第一个参数是 初始消息,面向所有节点,使用一次vprog来更新节点的值
* 第二迭代次数
* 第三个参数 消息发送方向 EdgeDirection.Out out代表源节点-》目标节点
* 第一个函数vprog 更新节点程序
* 第二函数 sendMsg 发送消息函数(主处理)
* 第三个函数 合并
* */
val sssp = graph4.pregel(Int.MaxValue)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr!= Int.MaxValue && triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println("sssp:");
sssp.triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
最短路径算法:计算的是从V1到所有顶点的最短路径
5.6.2 二跳邻居
def vprog(vid:VertexId,vdata:VMap,message:VMap): Map[VertexId,Int] ={
addMaps(vdata,message)
}
def addMaps(spmap1:VMap,spmap2:VMap): VMap ={
val ids: Set[VertexId] = spmap1.keySet++spmap2.keySet
ids.map(k=>{
k ->math.min(spmap1.getOrElse(k,Int.MaxValue),spmap2.getOrElse(k,Int.MaxValue))
}).toMap
}
def merMaps(spmap1:VMap,spmap2:VMap): VMap ={
val ids: Set[VertexId] = spmap1.keySet++spmap2.keySet
ids.map(k=>{
k ->math.min(spmap1.getOrElse(k,Int.MaxValue),spmap2.getOrElse(k,Int.MaxValue))
}).toMap
}
def sendMsg(e:EdgeTriplet[VMap,_]):Iterator[(VertexId,Map[VertexId,Int])] ={
//两个集合的差集
val srcMap = (e.dstAttr.keySet -- e.srcAttr.keySet).map(k=>{k->(e.dstAttr(k)-1)}).toMap
val dstMap = (e.srcAttr.keySet -- e.dstAttr.keySet).map(k=>{k->(e.srcAttr(k)-1)}).toMap
if(srcMap.size == 0 && dstMap.size == 0) Iterator.empty
else {
Iterator((e.dstId, dstMap), (e.srcId, srcMap))
}
}
def main(args: Array[String]): Unit = {
/** * //pregel参数
* //第一个参数 Map[VertexId, Int]() ,是初始消息,面向所有节点,使用一次vprog来更新节点的值,由于Map[VertexId, Int]()是一个空map类型,所以相当于初始消息什么都没做
* //第二个参数 two,是迭代次数,此时two=2,代表迭代两次(进行两轮的active节点发送消息),第一轮所有节点都是active节点,第二轮收到消息的节点才是active节点。
* //第三个参数 EdgeDirection.Out,是消息发送方向,out代表源节点-》目标节点 这个方向 //pregel 函数参数
* //第一个函数 vprog,是用户更新节点数据的程序,此时vprog又调用了addMaps
* //第二个函数 sendMsg,是发送消息的函数,此时用目标节点的map与源节点的map做差,将差集的数据减一;然后同样用源节点的map与目标节点的map做差,同样差集的数据减一
* //第一轮迭代,由于所有节点都只存着自己和2这个键值对,所以对于两个用户之间存在变关系的一对点,都会收到对方的一条消息,内容是(本节点,1)和(对方节点,1)这两个键值对
* //第二轮迭代,收到消息的节点会再一次的沿着边发送消息,此时消息的内容变成了(自己的朋友,0)
* //第三个函数 addMaps, 是合并消息,将map合并(相当于求个并集),不过如果有交集(key相同),那么,交集中的key取值(value)为最小的值。
*/
val newG=g1.pregel(Map[VertexId,Int](),two,EdgeDirection.Out)(vprog,sendMsg,merMaps)
println("newG vertices")
newG.vertices.mapValues(_.filter(_._2 == 0).keys).filter(!_._2.isEmpty).collect().foreach(println(_))
}
结果
(1,Set(5, 6, 4))
(2,Set(5, 6, 4))
(4,Set(1, 6, 2))
(5,Set(1, 2))
(6,Set(1, 2, 4))
六.PageRank算法
6.1 原理
PageRank让链接来"投票"
一个网页的评分由所有指向它的网页的重要性加权计算得出。超链接相当于等于获得一票。其PageRank值是通过递归算法基于所有进入该网页(' 链入 网页')的重要性的计算得出。拥有较多 链入 网页通常具有较高的排名。相反地,在没有任何进入该网页('零 入度 网页')的情况下,则该网页无排名。
2005年初的时候, Google为网页链接推出了一个称为‘nofollow’的新属性. 这一举措使得网站管理员和网站作者能够发布一些不具备投票权的链接——这些链接不会被搜索引擎当作网络评论来处理. 不过需要注意的是, 这一规则的应用能够有效抑制网络评论垃圾.
假设一个包含4个页面的小型团队:A、B、C与D。当所有网页均指向A时,则其PR(PageRank)值等于其余网页 Pagerank之和。

接着假设B也指向了C,并且D也指向了包含A在内的3个页面。每个网页只能获得一次投票。因此,在这种情况下,B对每个目标网页分配了半票,而同样地,D投出的所有票中,只有三分之一会计入A的PageRank计算中

换句话说,根据链出总数平分一个页面的PR值。

最终,在计算过程中将所有数据转换成百分比后乘以某个系数来确定其权重。特别地,在数学计算中,默认情况下每个网页都会获得至少这个数值:即那些不包含外部链接的网页不会向其他页面传递任何PageRank值。因此,在数学计算中,默认情况下每个网页都会获得至少这个数值:即那些不包含外部链接的网页不会向其他页面传递任何PageRank值。

在Sergey Brin和Lawrence Page于1998年的原始论文中为每个网页设定的最低PageRank值为1-d;而这里采用的是(1-d)/N的形式。因此,一个网页的PageRank值是基于其他所有网页的PageRank进行计算得出。Google通过不断重复对每个网页进行PageRank重新计算。假设我们对每个网页初始化一个非零随机PageRank值;经过反复进行迭代计算后,这些网页的PR值将逐渐趋于稳定状态。
6.2 实现
两个退出方式
在处理固定数值的迭代步数(静态策略)与公差值退出机制(动态控制)时,在处理面向对象和基于对象模式时。
静态退出方式采用迭代次数变量(numIter)作为判定依据;动态退出方式则以公差阈值作为动态退出条件;当网络图中所有节点的变化量均低于设定公差时,则判定算法收敛并终止运行。
在GraphX的研究框架中,resetProb参数与其在PageRank论文中的对应关系(即1-d)具有相同的含义。其中d值代表抑制因子的大小,在实际应用过程中需要根据具体情况对其进行合理设置。例如,在PageRank算法通常建议采用0.85的 damping factor作为标准设置的情况下,在实际应用中GraphX团队建议将resetProb参数设定为0.15较为合适。
val graph2: Graph[Double, Double] = graph1.pageRank(0.0001)
graph2.triplets
.collect
.foreach(triplet => println(s"srcId=${triplet.srcId} srcAttr=${triplet.srcAttr}--edge=${triplet.attr}--dstId=${triplet.dstId} dstAttr=${triplet.dstAttr} "))
结果
srcId=2 srcAttr=1.3907556008752426–edge=1.0–dstId=1 dstAttr=1.4596227918476916
srcId=4 srcAttr=0.15007622780470478–edge=1.0–dstId=1 dstAttr=1.4596227918476916
srcId=1 srcAttr=1.4596227918476916–edge=1.0–dstId=2 dstAttr=1.3907556008752426
srcId=6 srcAttr=0.7017164142469724–edge=0.5–dstId=3 dstAttr=0.9998520559494657
srcId=7 srcAttr=1.2979769092759237–edge=0.5–dstId=3 dstAttr=0.9998520559494657
srcId=7 srcAttr=1.2979769092759237–edge=0.5–dstId=6 dstAttr=0.7017164142469724
srcId=6 srcAttr=0.7017164142469724–edge=0.5–dstId=7 dstAttr=1.2979769092759237
srcId=3 srcAttr=0.9998520559494657–edge=1.0–dstId=7 dstAttr=1.2979769092759237
参考
https://zzckm.github.io/2019/04/25/1_Spark 之 Graphx/
https://spark.apache.org/docs/latest/graphx-programming-guide.html
