Advertisement

大数据开发实战系列之Spark电商平台

阅读量:

基于大型企业的电子商务系统的统计数据处理平台。该系统主要采用了Spark框架作为核心组件,并能够完成对各类电子商务日志文件的收集、存储、分类以及离线处理,并同时支持在线实时数据分析。

该基于大数据分析的平台对电商网站的不同类别用户的各项活动(包括浏览记录、购买记录以及精准广告点击等)进行收集并评估。根据该平台整理出的数据集和相关统计结果,在公司的业务运营中起到优化运营策略的作用。

PM(产品经理)、数据分析师以及管理人员通过研究现有产品的状况并基于用户行为分析的结果进行产品设计的持续优化。此外,在此基础上制定相应的公司战略与业务策略,并据此制定相应的公司战略与业务策略,并通过应用大数据技术手段实现提升公司业绩、营业额以及市场占有率的目标。

1.2 项目整体框架

1.3 业务需求分析

项目分为 离线分析系统实时分析系统 两大模块

在该离线分析系统的运作过程中,我们通过虚拟化处理真实的业务场景,将其虚构的业务场景模拟数据存储于Hive数据库表中。随后被该离线分析系统从Hive数据库表中读取并进行数据分析

中获取数据,并根据实际需求(用户访问 Session

数据分析、页面跳出率的数据挖掘、地区热卖商品的数据收集)对数据进行处理并运算后得出相关结果信息;最后将整理完成的数据信息存档至数据库中

MySQL 的对应表格中。

在实时分析系统中,我们将模拟业务数据写入

Kafka 集群中,实时分析系统从

通过 Kafka Broker 接收数据,并利用 Spark Streaming 实时流式处理广告点击流量进行动态分析。最后将统计结果存储至 MySQL 数据库

的对应表格中。

1.4 数据源分析

1.4.1 user_visit_action

存放网站或者 APP 每天的点击流数据

字段名称 说明
date 日期,代表这个用户点击行为是在哪一天发生的
user_id 用户 ID,唯一标识某个用户
session_id Session ID,唯一标识某个用户的一个访问 session
page_id 页面 ID,点击了某些商品/品类,也可能是搜索了某个关键词,然后进入了某个页面,页面的 id
action_time 动作时间,这个点击行为发生的时间点
search_keyword 搜索关键词,如果用户执行的是一个搜索行为,比如说在网站/app 中,搜索了某个关键词,然后会跳转到商品列表页面;
click_category_id 点击品类 ID,可能是在网站首页,点击了某个品类(美食、电子设备、电脑)
click_product_id 点击商品 ID,可能是在网站首页,或者是在商品列表页,点击了某个商品(比如呷哺呷哺火锅 XX 路店 3 人套餐、iphone 6s)
order_category_ids 下单品类 ID,代表了可能将某些商品加入了购物车,然后一次性对购物车中的商品下了一个订单,这就代表了某次下单的行为中,有哪些商品品类,可能有 6 个商品,但是就对应了 2 个品类,比如有 3 根火腿肠(食品品类),3 个电池(日用品品类)
order_product_ids 下单商品 ID,某次下单,具体对哪些商品下的订单
pay_category_ids 付款品类 ID,对某个订单,或者某几个订单,进行了一次支付的行为,对应了哪些品类
pay_product_ids 付款商品 ID,支付行为下,对应的哪些具体的商品
city_id 城市 ID,代表该用户行为发生在哪个城市 ,和城市信息表做关联

1.4.2 user_info

基础信息存储结构;该系统存储了网站所有注册用户的详细记录

字段名称 说明
user_id 用户 ID,唯一标识某个用户
username 用户登录名
name 用户昵称或真实姓名
age 用户年龄
professional 用户职业
gender 用户性别

1.4.3 product_info

普通的商品基本信息表;这张表中存放了网站/APP 所有商品的基本信息

字段名称 说明
proudct_id 商品 ID,唯一标识某个商品
product_name 商品名称
extend_info 额外信息,例如商品为自营商品还是第三方商品

1.4.4 city_info

字段名称 说明
city_id 城市ID
city_name 城市名称
area 地区名称,如:华北,华东,西北

1.4.5 实时数据

字段名称 取值范围
timestamp 当前时间毫秒 类型:数字
userId 用户id,类型:数字
area 东北、华北、西南 类型:文本
city 城市中文名: 北京、上海、青岛等 类型:文本
adid 点击的广告编号,数字

1.5 项目需求

1.5.1 用户访问session统计

对用户的 session 访问数据进行统计分析的具体内容包括:首先计算每个 session 平均步长占总步长的比例;其次计算每个 session 访问时长占比比例;随后按照时间分布的比例随机选取部分 session;最后分别记录并提取每天点击、下单及购买行为的品类排名前十数据;同时从这些 top10 类别中提取前十的 session 数据作为样本集。这一模块能够帮助产品设计团队、数据分析人员以及管理层直观了解不同条件下用户的实际行为模式及其对应的统计指标,并在此基础上做出相应的战略决策优化。主要采用了Spark Core框架来实现这一功能。

1.5.2 页面单跳转化率统计

该模块负责计算关键页面间的单步跳转转化率,并包含基于页面切片算法与页面流匹配算法的核心逻辑。它能够帮助产品经理、数据分析师及企业管理层观察各个关键页面之间的转化效果,并据此进行网页布局的优化设计。主要使用Spark Core作为实现工具。

1.5.3 区域热门商品统计

该模块负责处理每日生成各个区域商品销售排行榜。
该模块帮助管理层了解电商平台在不同地区销售的商品种类及其表现,
并为制定公司商品战略提供依据。
主要使用Spark SQL平台运行。

1.5.4 广告流量实时统计

系统每触发一次广告点击事件后都会生成相应的埋点日志记录;在实时数据分析平台中,该系统会采用特定机制将采集的数据发送至分布式消息队列(如Kafka)。

将日志转发至后台 web 服务器运行着的 nginx 服务中,在其负载均衡机制下均匀分配至多台 Tomcat 服务器。各个 Tomcat 服务器持续地将日志信息记录在其专用的日志文件里,并完成记录后,则会被第三方的日志采集代理如 Flume Agent 进行收集。这些信息随后会被发送至 Kafka 消息队列中进行存储。我们的实时分析系统则会定期从 Kafka 消息队列中提取这些更新的数据,并立即对其进行计算与统计汇总。

该模块的核心作用体现在使产品经理及高管能够即时获取各类广告投放的数据反馈,并为其制定并优化相关的广告投放策略提供数据支持。其主要职责是实时追踪并汇总各项关键指标数据(涵盖展示量与点击量两个核心指标),并通过多种算法构建动态拦截机制(包括但不限于:基于IP地址的静态拦截与基于特征行为模式的动态拦截)以及拦截策略筛选机制(即滑动窗口内各城市展示量与点击量指标监控系统)。具体功能还包括对每个区域每个媒体平台展示与点击情况的数据采集与处理能力(支撑各区域top3展示与点击表现优异媒体平台识别)。

预备知识

2.1 Spark

2.2 SparkCore

转换算子
map 转换集合中的每一个元素
flatMap 转换集合中的每一个元素,压平集合,把集合打碎,打碎成非集合元素
filter 过滤元素
reduceByKey 以key为单位进行聚合
groupByKey 以key为单位进行聚合(聚在一起,RDD[key,value]=>RDD[key,iterable[value]])
left join 左外连接,全集在左边
join 内关联,取左右交集
行动算子
collect 把RDD结果其提取到driver
take 取前n个
countByKey 每个key的个数 Map[key,count]
foreach 遍历RDD中每个元素 ,一般用于每个元素的输出
foreachPartition 按分区遍历
存储
saveAsTextFile 存储为文件
toDF.write.saveAsTable 存储到hive中
toDF.write.format(jdbc).option.save 存储到mysql

2.3 Spark RDD持久化

Spark的核心功能模块之一是可以将RDD(Resilient Distributed Datasets)持久化存储在内存中。每当对RDD执行持久化操作时,在每个节点上都会负责将该节点处理过的RDD的操作partition进行持久化存储于内存中,并且在后续对该RDD进行任何操作时均会直接调用内存中的partition块以完成相关运算。这种方式下,在面对一个特定的RDD需要进行多次重复运算的情况时,则只需要针对该RDD执行一次完整的计算过程即可得到所需的结果;而后在后续的操作中只需直接调用该RDD的一次计算结果即可满足需求;这样一来不仅能够显著提升运行效率还能最大限度地降低资源消耗

要持久化一个 RDD,只要调用其 cache()或者 persist()方法即可。

cache()和 persist()的区别在于,cache()是

一种简化的实现方式persist()
cache()底层直接实现了无参数形式的persist()
同时d直接实现了PersistMEMORY_ONLY模式下的功能,并将输入持久化到内存。

如果需要从内存中清除缓存,那么可以使用 unpersist()方法。

持久化级别 含义
MEMORY_ONLY 以非序列化的 Java 对象的方式持久化在 JVM 内存中。如果内存无法完全存储 RDD 所有的 partition,那么那些没有持久化的 partition 就会在下一次需要使用它们的时候,重新被计算
MEMORY_AND_DISK 同上,但是当某些 partition 无法存储在内存中时,会持久化到磁盘中。下次需要使用这些 partition
时,需要从磁盘上读取
MEMORY_ONLY_SER 同 MEMORY_ONLY,但是会使用 Java 序列化方式,将 Java 对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大 CPU 开销
MEMORY_AND_DISK_SER 同 MEMORY_AND_DISK,但是使用序列化方式持久化 Java 对象
DISK_ONLY 使用非序列化 Java 对象的方式持久化,完全存储到磁盘上
MEMORY_ONLY_2 MEMORY_AND_DISK_2 等等 如果是尾部加了 2 的持久化级别,表示将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可

2.4 Spark共享变量

Spark 一个非常重要的特性就是共享变量。

通常情况下,在某个算子函数中引用外部变量时, 其值会被复制到每个任务中. 因此, 在这种机制下, 各个任务只能拥有各自独立的变量副本. 若希望多个任务共享同一变 量, 则此方式不可行.

Spark为此支持了两种数据共享机制:广播型存储(Broadcast Variable)与累加型存储(Accumulator)。广播型存储会将所需的数据仅在每个Executor节点处进行一次拷贝,并以提升运行效率的同时最大限度地降低数据传输和内存消耗为主要目标。而累加型存储则允许多个执行任务(task)共同使用同一份缓存副本。Broadcst Variable主要用于读取操作,在不影响其他任务的前提下允许修改者进行更新。

2.4.1 广播变量

2.4.2 累加器

Accumulator 是仅在相关操作中进行累加操作的变量。因此可以在并行计算环境中得到有效的支持。它们可用于实现计数功能(例如 MapReduce 算法或总和计算)。

Accumulator 存在于Driver端,在由Executor节点持续不断地发送至Driver端的过程中,在SparkContext创建时自动初始化(即Driver端在SparkContext启动后即可初始化),因此该数值能够被Driver端获取。存于Driver端的一个独特数值信息(存于Driver端)无法通过其他节点访问

Spark 提供的 Accumulator 主要负责多个节点共享一个变量的操作。该组件仅支持累加运算,并为各个任务提供了一个协同处理同一个变量的能力。然而,在这种设计下,每个任务 仅能对其执行累加运算,并不具备读取其当前值的能力;只有主程序能够访问和读取其当前值。

复制代码

2.5 SparkSQL

2.5.1 RDD 、 DataFrame 、DataSet

RDD

RDD 是 Resilient Distributed Datasets 的全称,在 Spark 中扮演着核心角色。它是数据处理的基本抽象模型,并且作为一个静态分区的数据集合提供给应用程序进行高效运算。它所具有的不可变特性确保了数据的一致性,并支持并行计算以提升处理效率。

DataFrame

它是一种分布式的数据存储容器。相较于RDD而言,在结构化信息存储方面更具灵活性和可扩展性。除了存储原始数据外,在结构化信息方面还需要记录相应的元数据信息即Schema描述模型字段及其关联关系等元数据项同时该系统还支持嵌套式复杂数据类型如复杂数组对象和映射表等这些特性使其功能更加丰富实用

DataSet

搭建过程

sparkmall

离线-需求

1.获取点击、下单和支付数量排名前 10 的品类

简介

对于符合特定条件的 session而言,在数据集中筛选出那些在点击、下单及支付行为方面表现最为活跃的商品类别。具体来说,每个 session可能会涉及多个商品类别,并且这些类别会经历多种互动行为。因此我们需要筛选出那些在点击、下单及支付行为方面表现最为活跃的商品类别。

复制代码

思路

  1. 通过调用Hive表中的get函数来获取用户行为数据
  2. 借助累加器模块将不同品类的各自指标数据进行汇总计算:将原始数据格式(K-V)转换为(category-指标, 总计数SumCount)
  3. 对汇总结果进行重构数据结构处理:将格式为(category-指标, 总计数SumCount)的数据转换为(category, 包含(指标及其总计数SumCount)的数据集合)
  4. 按照相同的品类对重构后的数据进行分组整理
  5. 按照各个品类下的具体指标数值进行降序排列
  6. 筛选并提取排序后排名前10的数据项
  7. 将最终处理完毕的数据结果存储至目标数据库中

2.Top10热门品类中Top10活跃Session统计

简介

对于排名前 10的品类, 分别 获取其 点击次数 排名前 10的 sessionId。

思路

基于需求1的结果对原始数据执行筛选操作。
对筛选出的数据进行字段结构调整:将原始字段拆分为category和sessionid,并新增一个标识字段。
对结构调整后的数据应用聚合函数处理:针对每个category和sessionid组合计算总计值。
将汇总结果重新组织成规范化的格式:每个条目由category唯一标识,并包含session号及其对应总计值。
按照category字段对处理后的数据分组归类:每组记录包括一个唯一的session号及其总计数值。
按指定排序规则整理处理后列表,并提取排名靠前的前十项实例。
最终整理完毕的数据信息被导入至MySQL数据库中存储。

3.页面单跳转化率统计

简介

统计页面单跳转化率

然后找出那些不仅访问过网页3还紧接着访问过网页5的PV数(记为B)

最后将B与A进行比较即可得到该点击事件的发生概率

即:(B / A) 就是该特定路径下的点击转化率

通过这种方式我们可以清晰地了解不同广告位之间的关联性以及广告效果的表现形式

这种指标在评估广告投放效果方面具有非常重要的参考价值

因此掌握这一概念对于数字营销从业者来说是非常基础也是至关重要的一步

通过这样的分析我们可以更好地优化广告投放策略并提升整体营销效果

总结来说这个指标为我们提供了一种量化评估工具来监控和优化在线营销活动中的关键绩效指标(KPI)

这对于提高企业的数字营销效率具有重要意义

通过持续的数据收集和分析企业可以根据实际运营情况不断优化广告投放策略以实现更高的商业目标

这也体现了数据分析在现代市场营销中的核心作用以及其不可替代的价值所在

因此学习并熟练掌握这类核心概念对于从事数字营销相关工作的人员来说是非常必要的一步

通过这样的学习过程我们可以建立起对数字营销中各项关键指标的基本认知并为进一步的专业学习打下坚实的基础

页面的访问时有先后的,要做好排序

思路

  1. 从行为表中获取数据(pageid)
  2. 对数据进行筛选过滤,保留需要统计的页面数据
  3. 将页面数据进行结构的转换(pageid, 1)
  4. 将转换后的数据进行聚合统计(pageid, sum)(分母)
  5. 从行为表中获取数据,使用session进行分组(sessionid, Iterator[ (pageid , action_time) ])
  6. 对分组后的数据进行时间排序(升序)
  7. 将排序后的页面ID,两两结合:(1-2,2-3,3-4)
  8. 对数据进行筛选过滤(1-2,2-3,3-4,4-5,5-6,6-7)
  9. 对过滤后的数据进行结构的转换:(pageid1-pageid2, 1)
  10. 对转换结构后的数据进行聚合统计:(pageid1-pageid2, sum1)(分子)
  11. 查询对应的分母数据 (pageid1, sum2)
  12. 计算转化率 : sum1 / sum2

实时-需求

Kafka创建主题

复制代码

Kafka生产者

复制代码

Kafka消费者

复制代码

1.广告黑名单实时统计

开发实时动态防作弊系统:当某广告日均点击次数超过100时自动拦截该用户。 将新加入的不良用户信息存储至redis数据库,并对该列表定期清理。 已加入黑名单的信息不再被用于后续广告匹配。

思路

复制代码

在程序中会遇到三个问题以及解决

问题1:task中使用的第三方对象没有序列化(连接对象)

在Executor节点创建连接

问题2:黑名单的数据只取了一次

希望获取数据的操作可以周期的执行(transform)

问题3:java序列化会出现无法反序列化(transient)的问题

采用广播变量来传递序列化数据

2.实时数据分析: 广告点击量实时统计

每天各地区各城市各广告的点击流量实时统计。

3.实时数据分析: 每天各地区 top3 热门广告

每天各地区 top3 热门广告

增加部分

1. 统计一分钟的广告点击率(实时)

2. 统计每个页面平均停留时间

全部评论 (0)

还没有任何评论哟~