数据应用OneID:ID-Mapping Spark GraphX实现
前言
说明
基于用户的实体模型中,其中ID字段主要包含user_id和device_id两个字段。值得注意的是,并非仅限于这两个字段。每个不同的ID对应的.service周期及其服务周期各不相同。
该设备周期通常是从设备首次被识别阶段开始一直到其不再活跃阶段结束的时间段
在用户完成登录后由系统给予的一个唯一标识符,在同一时间段内只要同一个user_id就会被不同设备视为同一个用户。然而,在未完成注册流程之前是无法获取到该user_id值的这一限制因素将会导致无法访问用户的登录前行为数据
单一应用仅凭user_id或device_id无法全面表征一个用户;不同应用场景下使用的id类型存在差异性。因此亟需建立一种机制实现id间的关联映射;最终目标则是通过唯一的 id 来标识每个用户。
用户渠道
- 手机、平板电脑
- 安卓手机、ios手机
- 有PC、APP和小程序
标识情况
(1)cookieid:PC站存在用户cookies中的ID,会被清理电脑时重生成。
(2)unionid:微信提供的唯一身份认证。
(3)mac:手机网卡物理地址。
(4)imei(入网许可证序号):安卓系统可取到。
(5)imsi(手机SIM卡序号):安卓系统可取到。
(6)androidid :安卓系统id。
(7)openid (app自己生成的序号) :卸载重装app就会变更。
(8)idfa(广告跟踪码):用户可重置。
(9)deviceid(app日志采集埋点开发人员自行设定一种逻辑标识,在逻辑层面的标识可源自 android、imei、openudid等):在逻辑层面上的标识
还有其他不同应用设定标识用户的ID. . . . . .
设备与登录用户分析
1. device_id 作为唯一
场景
适用登录率比较低的应用。
缺点
- 在多个设备上登录的不同身份的用户会被系统视为同一个用户。
- 同一账号在不同的设备上使用会被系统视为多个独立的登录记录。
2. 一个device_id关联一个user_id
场景
同一个设备登陆前(device_id) 和登录后(user_id) 可以绑定。
缺点
- 当设备处于未绑定状态下,在 login 前后用户的属性存在差异性特征,在 login 状态下可能与 login 前的状态产生混淆性误判。
- 已经完成设备绑定的一台设备可能在后续由他人操作(此时尚未进行 account login)时出现身份关联性误报现象。
- 已经绑定了 account 的同一台计算机若用于连接到其他网络环境中的计算机(此时尚未进行 account login),系统会将这些关联操作视为该机器人的操作行为。
3. 多个device_id关联一个user_id
场景
当同一个用户的 user_id 相同时,在不同设备之间实现数据互通。
缺点
每个 device_id 最多只能绑定给一个用户,在同一设备被其他用户尝试登录时,系统仍会将登录前的数据误判为已绑定到该设备的用户的
4. 多个应用间的不同ID进行关联
场景
当存在多个应用时 建立各个应用间的联系并实现信息共享 当存在多个应用时 例如使用手机号 邮箱地址 微信等多种方式将这些联系统一到一个标识符中
缺点
复杂性高。
5. 行业内方案
网易ID-Mapping
网易旗下产品线包括网易云音乐、邮箱服务、新闻资讯以及严选平台等不同功能模块;每个应用程序都具有独特的标识符:如phone标识手机功能模块、email标识通信功能模块、yanxuan_id代表严选会员专属权益模块以及music_id则对应音乐播放相关功能模块等
思路与方案
- 综合考虑各类应用账号、各设备类型间的关联情况及设备使用模式(如时间频率)。
- 通过基于规则的过滤方法和数据挖掘技术来识别账户是否由同一人创建。
存在问题和方案
- 用户具有多个设备信息: 用户依赖特定时间段 和 频次才能实现多设备信息的关联。
- 设备不再被使用: 设备未配置衰减机制。
6. 其他
美团运用多种登录手段包括手机号码微信微博以及美团个人账号让用户便捷地完成注册流程而大众点评则提供了手机号码微信QQ以及微博等多种登录选择两者在登录功能上具备较高的相似度具体表现为都支持手机号码作为主要登录入口此外还都提供了微信微博等社交平台账号供用户选择结合用户体验调研结果最终美团决定以手机号码作为注册用户的唯一标识符以确保账户体系的安全性和稳定性
图计算
图计算的核心理念:将数据以"节点"的形式进行表示,并通过某种业务逻辑建立起连接这两个节点的边。随后我们便可以从这些节点及其关联关系中挖掘出不同数据类型之间的关联模式。

在GraphX中,图由顶点(Vertices)和边(Edges)组成:
- 节点(Nodes):图中的标记位置符号,在现实世界中通常用来象征特定的人类或其他实体。
- 线段(Line Segments):连接两个节点的关系。
- 特征(Attributes):线段所具有的附加信息。
- 标记符号(Node Marks):节点所具有的附加信息。
首先通过一个案例先认识下图计算。
案例:朋友关系的连通性
首先,需要将这些数据转换为Vertex和Edge对象
假设有以下数据:
user_id: A, friend_id: B
user_id: B, friend_id: C
user_id: C, friend_id: D
user_id: D, friend_id: E
user_id: E, friend_id: F
user_id: F, friend_id: G
user_id: G, friend_id: H
user_id: H, friend_id: I
user_id: I, friend_id: J

import org.apache.spark._
import org.apache.spark.graphx._
val conf = new SparkConf()
.setAppName("Graph Example")
.setMaster("local[*]")
val sc = new SparkContext(conf)
// 将原始数据转换为Vertex和Edge对象
val vertices: RDD[(VertexId, String)] = sc.parallelize(Seq(
(1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"),
(6L, "F"), (7L, "G"), (8L, "H"), (9L, "I"), (10L, "J")
)
)
val edges: RDD[Edge[String]] = sc.parallelize(Seq(
Edge(1L, 2L,"friend"), Edge(2L, 3L,"friend"), Edge(3L, 4L,"friend"),
Edge(4L, 5L,"friend"), Edge(5L, 6L,"friend"), Edge(6L, 7L,"friend"),
Edge(7L, 8L,"friend"), Edge(8L, 9L,"friend"),
Edge(9L, 10L,"friend"), Edge(10L, 1L,"friend")
))
// 创建图
val graph: Graph[String,String] = Graph(vertices, edges)

// triplets同时存储了边属性和对应顶点信息
graph.triplets.foreach(println)
((4,D),(5,E),friend)
((5,E),(6,F),friend)
((9,I),(10,J),friend)
((10,J),(1,A),friend)
......
// 连通性:可以将每个顶点都关联到连通图里的最小顶点
val value = graph.connectedComponents()
value.vertices.map(tp => (tp._2, tp._1))
.groupByKey()
.collect()
.foreach(println)
结果:(1,CompactBuffer(8, 1, 9, 10, 2, 3, 4, 5, 6, 7))
如果修改:Edge(5L, 1L,"friend") Edge(10L, 5L,"friend")
val edges: RDD[Edge[String]] = sc.parallelize(Seq(
Edge(1L, 2L,"friend"), Edge(2L, 3L,"friend"), Edge(3L, 4L,"friend"),
Edge(4L, 5L,"friend"),
Edge(5L, 1L,"friend"), Edge(6L, 7L,"friend"),
Edge(7L, 8L,"friend"), Edge(8L, 9L,"friend"),
Edge(9L, 10L,"friend"), Edge(10L, 5L,"friend")
))
结果:
(1,CompactBuffer(1, 2, 3, 4))
(5,CompactBuffer(8, 9, 10, 5, 6, 7))

ID-Mapping 简单实现
val conf = new SparkConf()
.setAppName("Graph Example")
.setMaster("local[*]")
val sc = new SparkContext(conf)
// 假设我们有三个数据集
val userMappingData = sc.parallelize(Seq(
(11L,111L), // phone,device_id
(22L,222L)
))
val userInfoData = sc.parallelize(Seq(
(11L, 1111L), // phone,open_id,这里把phone当作user_id
(22L, 2222L)
))
val userLoginData = sc.parallelize(Seq(
(1111L, 11111L, 111111L), // open_id,idfa,idfy
(2222L, 22222L, 222222L)
))
// 为每个数据集创建顶点RDD
// val userVertices = userMappingData.flatMap(item =>{
// for (element <- item.productIterator)
// yield (element,element)
// })
val phoneVertices = userMappingData.map { case (phone, _) => (phone, "phone") }
val deviceVertices = userMappingData.map { case (_, deviceId) => (deviceId, "deviceId") }
val userPhoneVertices = userInfoData.map { case (phone,_) => (phone, "phone") }
val openidVertices = userInfoData.map { case (_, openId) => (openId, "openId") }
val idfaVertices = userLoginData.flatMap { case (openId, idfa, _) => Seq((openId, "openid"), (idfa, "idfa")) }
val idfvVertices = userLoginData.flatMap { case (openId, _, idfv) => Seq((openId, "openid"), (idfv, "idfv")) }
// 合并所有顶点RDD
val allVertices = phoneVertices.union(deviceVertices)
.union(userPhoneVertices).union(openidVertices)
.union(idfaVertices).union(idfvVertices)
// 创建边RDD
val mappingEdges = userMappingData.map { case (phone, deviceId) => Edge(phone, deviceId, "maps_to") }
val infoEdges = userInfoData.map { case (phone, openid) => Edge(phone, openid, "linked_to") }
val loginEdges = userLoginData.flatMap { case (openid, idfa, idfv) =>
Seq(Edge(openid, idfa, "logins_with"), Edge(openid, idfv, "logins_with"))
}
// 合并所有边RDD
val allEdges = mappingEdges.union(infoEdges).union(loginEdges)
val graph = Graph(allVertices, allEdges)

graph.triplets.map(item=> "点 and 边:"+item).foreach(println)
点 and 边:((22,phone),(222,deviceId),maps_to)
点 and 边:((11,phone),(111,deviceId),maps_to)
点 and 边:((11,phone),(1111,openId),linked_to)
点 and 边:((22,phone),(2222,openId),linked_to)
点 and 边:((1111,openId),(11111,idfa),logins_with)
点 and 边:((1111,openId),(111111,idfv),logins_with)
点 and 边:((2222,openId),(22222,idfa),logins_with)
点 and 边:((2222,openId),(222222,idfv),logins_with)

val value = graph.connectedComponents()
value.vertices.map(tp => (tp._2, tp._1))
.groupByKey()
.collect()
.foreach(println)
(11,CompactBuffer(1111, 11, 111, 11111, 111111))
(22,CompactBuffer(2222, 22, 222, 222222, 22222))
说明
真实的数据不一定是Long型,在某些情况下需要特别处理计算过程,并将结果转码为明文。案例中,在数据连通之后,则能够生成统一的标识符。
仅以简要案例为例,在进一步分析时可能会遇到不同情况。更为复杂的情况则可能需要更为 intricate 的逻辑处理。
除了图计算,直接SQL JOIN也即可。
