Advertisement

客户案例:电商平台大数据上云实践

阅读量:

该知名电商平台成立于公元后某年(如: 公元后某年),并以卓越的品牌技术运营和服务能力占据着跨境电子商务领域的行业领先地位。平台在其核心竞争力——品牌优势、技术优势、运营优势及用户优势四大维度上已建立起竞争优势。截至某一时间点(如: 某年某月),该平台为来自X个国家和地区的Y万名注册买家提供服务,在帮助他们与中国及其他国家的Z万卖家建立联系的同时(如: 建立起联系),每年拥有超过W万个在线商品可供选择,并通过多种物流线路满足客户需求(如: 满足需求)。平台拥有超过V条物流线路以及M个海外仓点(如: 点),并提供灵活多样的支付选择(如: 支付方式)。在全球主要市场均设有当地分支机构或代表处(如: 地方性机构)。

挑战与目标

面临的挑战

随着跨境电商逐渐成熟,其经营范围不断扩大的同时伴随着品类与渠道数量增加以及多种新技术在提升运营效率的场景中得到广泛应用的情况下,
对经过 20 年的数据积累与沉淀,在深入挖掘、分析与利用大数据方面面临诸多挑战,
这些挑战主要体现在成本控制、算力优化以及系统扩展等方面的限制上。
其中,
首先,
硬件设备投入大且维护成本高昂,
自建大数据集群需要包括服务器、存储设备、网络设备等多个关键硬件设施,
并且CDH平台有100个节点数免费上限,
这就要求企业持续投入大量人力物力才能完成系统的日常维护与升级工作;
其次,
弹性伸缩能力不足导致系统响应速度慢,
从采购到部署上线通常需要耗费一个月时间,
无法及时应对突发性的业务高峰需求;
再次,
计算存储耦合带来的资源竞争问题严重,
计算节点与存储节点共用同一物理节点资源时容易造成资源利用率下降;
最后,
现有的算力引擎如Hive On Spark(2.4版本)在性能上相比最新的Spark/Tez平台仍有较大差距,
现有方案难以通过简单的横向扩容或纵向升级来显著提升计算效率。

预期上云实现目标

  • 智能湖仓架构

构建智能化仓配体系, 通过全方位的数据流处理实现资源优化. 力争在系统运行初期就完成对全业务链条的支持. 持续推动各环节间的无缝衔接, 最终达成对物流资源的最大化利用. 通过深度挖掘价值, 实现精准分层与热点分析, 达到提升整体运营效能的目标.

  • 精细化运营成本管控

制定针对云资源的精细化运营管理方案及成本管理机制,在提升资源利用率的同时实现降本增效目标;通过优化资源配置策略使云资源根据业务需求动态调整规模,在确保服务稳定性的基础上显著提升业务灵活性与应对速率;借助云原生技术提供的智能分层架构、自动化管理与运维能力,在降低运维门槛的同时持续优化运维效能与服务质量

  • 一站式数据平台底座

构建集数据集成、数据开发、数据资产管理、数据服务等于一体的全链路大数据平台

数据架构及技术方案

本地大数据的技术组件及架构(IDC)****

该图展示了我们团队在大数据与云计算领域的最新研究成果

IDC 大数据环境基于 CDH、大数据开源生态组件、商业及自研工具构建。

数据源:涵盖数百个 MySQL、Oracle 以及 NoSQL 数据库实例(按库表区分),几十万张源表(分库分表),几十 TB 数据。

该系统采用先进的分布式架构设计,在每天成亿级的数据增量下持续稳定运行。实时传输至Kafka集群后, 系统通过负载均衡策略实现了资源的最佳利用, 从而保障了整体系统的稳定性和可靠性, 同时支持离线与实时的大规模数据分析处理需求。

基于 CDH 6.x 架构的大规模数据集群:通过 Cloudera Manager 轻松配置和管理 Hadoop 集群,并支持实时监控运行状态及故障排查。确保稳定的离线与实时计算引擎服务。

OLAP引擎:根据不同应用场景进行配置以部署ElasticSearch、ClickHouse和StarRocks查询引擎支持买卖家及业务运营方进行在线查询

业务应用:常见的报表及可视化工具包括Hue、Tableau和BO;自主开发的 EOS 系统以及与服务化接口进行对接的应用。

数据安全方面:该系统采用了集成的方式实现了统一用户认证与鉴权功能来保障数据安全。具体而言其中Kerberos提供了身份验证的核心支持Sentry则实现了基于角色的精细权限控制而LDAP则负责用户和组信息的集中管理这些技术协同作用显著提升了大数据集群的安全性与管理效能

数据开发平台:该平台采用了基于开源技术和自主研发成果的混合应用方案。其中,在任务调度部分设计了基于DolphinScheduler的任务调度机制,并在此基础上进行了功能模块的扩展与优化以实现可视化配置能力。此外,在数据分析流程中重点针对数据血缘、元数据以及生命周期管理等相关领域进行了深入研究与系统性设计以提升整体数据分析效率

本地 大数据集群负载(IDC)

离线大数据集群运算资源的利用率表现显著波动特征,在CPU、内存、硬盘存储以及网络IO等运算资源上均可观察到明显的时序规律。每日出现高峰时段:每日固定时段进行离线计算作业;在工作时段中分别于上午9至11点及下午2点至4点这两个时间段展开常规报表查询运算;平均运行效率约为30%。

下面是某个集群的工作负载截图:

CPU 负载:

Yarn 内存使用:

这个方法很有效
这个方法很有效

HDFS IO:

https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dhgate-group-big-data-cloud-practice4.jpg

dhgate-group的大数据云计算实践4.jpg

Network IO:

前期技术方案调研

前期调研阶段(即在前期调研阶段),云为科技投入了大量时间和精力对 Amazon EMR、Redshift 和 S3 进行全面评估和系统性测试,并获得了超出预期的好结果。因此最终决定将亚马逊云科技作为大数据云计算平台的基础架构选择

架构兼容性 技术先进性 算力 维护难易成本 开发平台 扩展性 成本
支持现有企业架构配置 采用先进技术并具有开放性,并能快速完成功能迭代 高效率的Spark EMR系统运行速度是CDH平台的两倍 EMR中端与低数仓配置
支持外部第三方开发平台集成 组件设计全面且支持按需扩展全球范围内的节点资源 全面优化后可实现按需扩展资源分布情况 中端配置

本次调研中,以云服务为主轴对包括离线Hadoop集群以及数据仓库服务配置情况的具体实施进行了分析.

项目 亚马逊云科技 DHgate
离线集群 数据分析服务 EMR CDH
主版本 6.3/5.34 CDH6.x
Hadoop 3.2.1/2.10.1 3.x
Hive 3.1.2/2.3.8 2.x
Spark 3.1.2/2.4.8 2.4.0
Master****节点 m6g.2xlarge

| Core****节点 | m6g.8xlarge
EBS:600GB | – | |

| Task****节点 | m6g.8xlarge
EBS:600GB | – | |

创建集群 15min
弹性收缩 2min,服务无需重启
Spot****竞价实例 预计可节约成本60%
一次性集群
风险点 版本升级Spark3兼容 100节点限制不能扩容
数据仓库 数仓服务 Redshift Spark-SQL
协议兼容 redshift-jdbc,兼容postgresql spark-jdbc
服务器资源配置项 ** rack server 3.4xlarge**
虚拟 CPU 数量 12
存储容量 96 GB
动态分配功能 spark节点动态分配功能启用
最大执行任务数 spark.dynamicAllocation.maxExecutors=60
每个任务核心数 spark.executor.cores=4
每个任务内存配置 spark.executor.memory=12GB
节点配置项
物理节点数量 数量为4台物理节点
虚拟 CPU 最大支持 最多支持60个虚拟 CPU
总存储容量 1PB
存储资源项
总体容量 无限制

| 数据存储 | 列式
—————列式———存储模式
—————数据占用空间约为Parquet存储空间的1.2至2倍 ———— Parquet列式 | |
| 性能 | 高效 —— 高性能 —— 性能与节点数量呈正比 | 低效 —— 低性能 —— 在资源充足的情况下运行时间基本稳定 —— 平均运行时长通常在76秒左右 ||

| 稳定性 | 高
测试期间未发生任何SQL执行异常 | 中高
稳定性和数据完整性、SQL性能优化及资源分配效率相关 | |
| 缓存 | 结果集缓存加速
数据缓存采用高效缓存机制 | 数据缓存 | |
| 弹性扩缩 | 最小节点数2
系统可动态扩展至5分钟内最高扩展收缩能力 | 动态调整节点数量 | |
| 维护优化 | 极简
系统提供智能自动生成优化建议的功能模块 | 开发人员可通过系统界面自主完成表结构优化配置 | |
|权限管理|列级权限|基于细粒度粒度的安全策略管理方案 ||

| 风险点 | 未分区配置通常情况下自动优化大表性能一般 | 数据倾斜可能导致内存溢出
Hive on Spark SQL的部分执行效率较低 |

在调研过程中,云为团队进行了系统的基准测试对比工作。从生产业务涉及的数百个查询场景中随机选取了42个典型的SQL语句,并分别对EMR/SparkSQL、CDH Hive on Spark及CDH/SparkSQL三个计算引擎的执行时间进行了详细对比分析。整个测试过程采用了单线程模式,并成功去除了Spark环境启动时间和缓存机制的影响因素,在此基础上得出了以下测试结论:经测试表明,在相同的硬件配置条件下,采用EMR平台运行的SparkSQL处理效率较之于CDH/SparkSQL版本高出至少两倍以上。这一显著提升效果主要得益于EMR 5.24版本后对SparkSQL运行时所实施的一系列优化措施成果;有关EMR运行时支持Apache Spark的技术细节,请访问官方博客:Amazon EMR introduces EMR runtime for Apache Spark | AWS Big Data Blog

云为对同样的查询SQL针对Redshift兼容性进行优化后,在RA3类型(存算分离)下开展数仓业务查询并发测试工作

云上的架构规划

新大数据架构采用了亚马逊云科技提供的EMR、Redshift以及S3等组件,并结合IDC自建的服务与亚马逊云科技资源相结合构建而成

数据源和数据缓冲区:新架构引入了EFS网络文件系统以及多账号支持的S3服务用于文件分享与传输功能,并同时支持同一区域的数据存储结构

离线计算集群:基于 EMR 架构搭建大数据集群,在主节点数量上设置为 3 个 Master 结点;按照固定规模部署 Core 和 Task 节点;通过弹性伸缩资源实现离线计算能力。

OLAP 引擎:增强了 Redshift 的数仓查询功能支持,并沿用 IDC 的现有生产服务组件配置

业务应用:通过报表可视化工具将Tableau、BO、Davinci等导入到FineBI中去,并实现了数据门户的一致性。

数据安全:采用了 Ranger+Ldap 来提供统一用户认证与鉴权服务,并确保了数据的安全性。Ranger 通过替代 Sentry 实现了对细粒度授权控制的支持;同时,在 S3 上的数据访问权限则由 IAM 策略进行管控。

数据开发平台:该企业的大数据团队与合作伙伴共同建设一个集数据集成、数据分析、智能处理和全方面数据管理于一体的智能化大数据平台。

云上新架构能够带来的价值
  • 弹性伸缩:遵循亚马逊云科技EMR存算分离架构,在计算资源层根据数据分析任务智能调配不同的算力资源,在短短秒级别内即可实现计算实例的增减调节功能,并解决了传统IDC资源采购到部署上线所需时间过长以及预制算力可能带来的资源浪费问题。
  • 性能提升:相比开源Spark而言,在相同资源投入下 Amazon EMR 的 Spark 运行时性能提升了1.7至2倍左右;而Presto同样进行了运行时优化,在原有基础上性能提升了约2.7倍左右;此外 Amazon EMR 还支持接入OLAP引擎实现交互式查询分析功能。
  • 成本节省: Amazon EMR 根据实际负载需求可动态调整集群规模,在业务高峰期增加实例数量并在业务淡季减少集群规模从而降低运营成本;同时提供了多组实例选择方案允许企业采用按需实例保障处理能力并利用竞价实例加快任务完成速度的同时还能结合不同实例类型以充分利用其各自的经济优势;此外 S3 智能分层存储管理技术可在不影响数据读写性能的前提下将存储成本降低约40%。
  • 开发效率:作为全托管式的云端数据平台 Amazon EMR 支持常驻集群模式以适应日常离线任务处理以及临时数据分析需求;同时支持通过直观友好的控制台界面或API快速搭建集群配置从而大幅降低了开发人员的工作强度并为企业大数据团队提供了更多集中精力开展创新探索的空间。
  • 平台化数据底座:基于亚马逊云科技智能湖仓架构打造了一个统一的数据基础平台实现了对原始数据加工清洗数据以及模型化数据等环节的高度整合;该平台不仅能够提供高并发精准化高性能的历史数据实时数据查询服务还能够支撑分析报表批处理数据挖掘等功能满足企业多维度分析需求。

核心技术架构

部署架构图

存算分离
  • S3 EMRFS 的智能分层

在大数据环境下推行存算分离架构已被行业普遍认可。相较于传统的 HDFS 三副本磁盘存储方案,S3 对象存储不仅提供了近乎完美的数据持久性保障,而且在成本方面更具优势:采用 EBS(GP3)方案时,相比采用 S3 标准存储模式,每 GB 数据的成本可降低约70%。此外,在 S3 上仅需支付一份数据的成本即可实现全量的数据储存,从而显著降低了运营成本。随着存算分离策略的实施,系统架构将更加灵活多变,能够实现按需弹性扩展计算资源:通过 Amazon EMR 的 Auto Scaling/Managed Scaling 功能,计算资源的增缩操作将变得更加便捷高效

该文件系统由亚马逊云科技自主开发为HDFS的一个实现版本。该S3-optimized Committer深入优化了大数据场景下的S3数据写入效率,并非仅限于Spark数据处理中的Multipart上传策略这一方面,在Hive数据处理中同样地实现了类似的技术以规避三次重命名操作带来的问题,在测试场景中的实际表现显示出显著提升效果( _[Up to 15 times improvement in Hive write performance with the Amazon EMR Hive zero-rename feature | AWS Big Data Blog](https://aws.amazon.com/blogs/big-data/up-to-15-times-improvement-in-hive-write-performance-with-the-amazon-emr-hive-zero-rename-feature/ "Up to 15 times improvement in Hive write performance with the Amazon EMR Hive zero-rename feature | AWS Big Data Blog) _),其中从EMR 6.5.0及其以上版本开始支持此功能

 ​

S3 的智能分层能够基于文件访问模式动态调整数据分布至最优存取层级,并从而实现存储成本的显著降低。该系统提供三种高效的读写层级结构:频繁存取层级(FA)、不常存取层级(IA)以及归档即存档层级(AIA)。通过内置集成机制,EMRFS 能够无缝衔接 S3 智能分层功能,在配置时只需将 S3 存储类别设置为智能分层专用类型即可完成部署。

大多数数据场景需要频繁地调用最近三个月的数据集,并且在同一时间也会较少地调用三个月前的数据。在访问三个月之前的数据显示时(即S3标准存储),系统依然能够保持与该服务相当的响应速度。S3 的智能分层架构支持独立于标准存储的快速查询(IA)和异步插入(AIA),它们与标准层级同样具备低延迟和高性能。通过配置 S3 的智能分层架构,在保持高性能的同时还能进一步降低存储成本。

Hive on Spark到 SparkSQL 的升级****

在执行从 CDH 6.x 至 EMR 6.x 的迁移操作过程中,并行处理从 Hive On Spark(Spark 2.4)升级至 SparkSQL(Spark3.2)的过程中也遇到了需要进行改造适配的挑战。

问题类型 问题详细描述 解决方案
sql 语法兼容性 spark-sql 执行代码关联条件nvl(t.item_code,rand()) = p.item_code 报错,Error in query: nondeterministic expressions are only allowed in spark sql 不支持在 on 条件中带有 rand(),需要去掉,或者改写成 t.item_code = p.item_code and t.item_code is not null
sql 语法兼容性 spark-sql 执行代码中row_number() over()报错 修改成 row_number() over(order by 1)
sql 语法兼容性 count(distinct rfx_id) over(partition by buyer_id) rfx_num 报错 spark-sql 不支持开窗中count(distinct ),使用size(collect_set() over(partition by order by))写法
sql 语法兼容性 spark-sql 执行 hive parse_url_tuple 函数报错 使用 regexp_extract(link_url,’http[s]://[/]+([?&#]+).*’,1)截取
sql 语法兼容性 hive on tez 执行,代码中带有 time 字段,报错 time 进行转义,在 hive sql 中写成time形式,在 shell 中写成`time`形式
函数兼容性 timestamp 函数报错,Function timestamp accepts only one argument 去掉第二个参数:yyyy-MM-dd hh:mm:ss
函数兼容性 split(visit_src_type,’|’)[3] 切出和原来数据不一致 增加转义符, 由 split(visit_src_type,’|’)[3] 变换成 split(visit_src_type,’\ |’)[3]

| 函数兼容性 | 当前判断条件为 confirm_date > add_months(‘${etl_date}’,-12) ,而 CDH 中包含的是 12 个月前的一天及之前的时间点;EMR 和 Spark-SQL 在执行时则不包含当天的值 | 将判断条件修改为 confirm_date >= add_months(‘${etl_date}’,-12) ,即包含 12 个月前的一天及之前的时间点 |
| 数据类型兼容 | 在 SQL 中字符串与数值进行比较存在关联性问题;RFX_GMV > 0 被用作过滤标准 | 修改 RFX_GMV > 0 至 RFX_GMV > 0.0;通过字段重定义将 RFX_GMV 的数据类型更改为小数型(Decimal(18,4))以提高精度 |

| 数据类型兼容 | spark 数据类型转换:Cannot safely cast ‘xxx’ :string to int | 1、添加参数 set spark.sql.storeAssignmentPolicy=LEGACY
2、字段做类型转换 cast(xxx as int) as xxx |

spark 运行时 spark-sql-e 执行,报错 Error in query: Cannot modify the value of a Spark config: spark.executor.cores 去掉 sql 中 set spark.xxx.xxx=xxx;类似的参数
空值处理 null 判断不一样 is not null and <>” is not null

| 空值处理 | 在CDH中对字段的空值赋值为' ';在IDC中字段的空值赋值为\N;设置过滤条件为字段名不等于' '时与EMR的结果集不一致 | 重新构建数据表,并新增Hive行格式化序列化引擎ColumnarSerDe;配置文本文件格式;在建表语句中增加TBLPROPERTIES参数设置('serialization.null.format'设为'')以实现相同的序列化格式

在 CDH5 至 CDH6 集群升级项目中,在较早的时间段内实现了计算引擎从 Hive 到 Hive On Spark 的迁移过程,并带来了显著的性能提升效果;同时解决了SQL语义理解相关的一系列兼容性问题以及UDF运行效率方面的技术挑战,并对数据文件格式适配性和系统运行参数设置等多方面的兼容性问题做出了优化改进工作;这些问题不在一一列举范围内

Kudu+Impala到数据湖(Hudi)的改造****

为了应对这一复杂需求,在大数据平台上实施了超过900张数据库表的数据同步工作以支持数据分析工作。通过IDC平台,在其Binlog Sink日志流中引入Canal解析器对日志文件进行解析后所得的数据结果会被自动转发至Kafka系统中实现各系统的解耦关系同时提升整体的数据回溯能力如可以从Kafka的一个特定时间点Offset位置获取相关的历史记录信息。接下来借助Kudu系统的Upsert功能将这些来自Kafka的数据直接存储于该系统中从而保证了数据库的一致性和完整性。最后每天都会通过Impala工具从Kudu提取这些日志存入HDFS作为原始数据分析基础的工作依据

尽管该方案具有一些优势,然而存在三个主要的问题.首先,KuDü本身基于磁盘存储引擎,导致其存储成本与运维维护费用相对较高;其次,Impala在高并发情况下可能存在稳定性不足的问题,可能会影响数据输出与查询操作;第三,因为原始数据需要经过大量的预处理步骤才能满足业务报表的需求.无法直接通过Impala从KuDü的原始表获取所需的数据用于业务报告.综合考虑了成本、性能以及扩展性的需求后发现,在现有条件下构建KuDü仓库并非最佳选择.因此,KuDü仅作为数据中间层存在,其性价比较低,也不适用于所有实时数据分析需求.

在云端构建智能湖仓系统时, S3 被选定为主干数据存储平台, 并根据具体业务场景合理配置计算引擎资源以达到最佳成本效益. 这种设计能够让数据在存储平台与计算引擎之间实现无缝对接. 针对拥有900张表 CDC 的业务模式, 推荐采用 Hudi 平台来应对这类场景的需求. Hudi 在亚实时处理方面表现突出, 支持毫秒级别延迟写入能力的同时提供灵活的数据传输策略选项, 包括按顺序组织数据处理流程可显著提升系统吞吐量. 此外, 系统还具备强大的容错机制, 可根据实际负载自动调整资源分配比例. 在实际应用中, 我们会基于历史数据分析最优配置方案并定期进行性能评估. 这种做法不仅能够有效降低运营成本还能确保系统的稳定运行.

基于Spark Structured Streaming框架对Kafka多个主题(由源端实例和库组合划分主题名)进行读取处理,并将不同主题的数据同步发送至900张Hudi表作为CDC数据输出。在开发过程中可分享以下几点:

采用 Spark 作业实现 900 张表的数据批量写入。通过多线程机制实现多张表的数据并行处理。每个作业负责管理约 30 至 100 张表格,并根据每张表格的数据量大小动态分配负载。举例而言,在数据量较小的情况下(例如一张小表格),单个作业可以管理约 1 到 15 张表格;而在数据量较大时(例如一张大数据库),则由单个作业负责管理约 3 到6 张表格。理论上只要资源充足,则一个作业就可以完成全部946 张表格的数据处理工作。然而在实际应用中,则需要考虑故障快速恢复机制以及业务优先级等问题。因此尽管单个作业支持多线程来进行批量数据读取与存储操作(即 foreachBatch 块),但无法达到同样效率的同时运行多个独立批处理任务的效果。在 Structured Streaming 运算中的 foreachBatch 块中,则是采用独立的线程群组来进行数据读取与存储操作:首先将每批 DataFrame 分割后发送至各独立作业,并由各独立作业按照指定条件筛选出对应的子集数据块;最后通过 unpersist 方法完成该批次的数据处理过程。

技术团队曾尝试采用多种Streaming Query方式来进行数据写入操作。然而这种方案存在缺陷每一个Query都会独立地从Kafka中消费数据从而导致Kafka的数据流消耗急剧增加此外想要通过Kafka监控工具直接查看offset也比较麻烦因为每个Streaming Query都对应有自己的消费者组offset若采用同一个消费者组则会导致offset记录出现问题进而被其他表的offset覆盖

2. 目前所使用的 EMR 版本号为 6.6.0。Hudi 的当前版本号为 0.10.1。
尽管 Hudi 是一个独立的软件包,并具备完整的功能集合,
但其计算引擎采用了解耦设计,
这使得它并不依赖于 Spark 运算的核心组件,
但同时需要注意的是,
直接从开源社区获取并部署于 EMR 上并不推荐。
原因在于,
Amazon EMR 对 Spark 进行了深度优化,
对源码进行了特定修改以提高性能,
但 API 层仍然保持与开源项目的兼容性一致。
而 EMR 内置了一个经过本地编译和适配过的 Hudy 软件包,
这在特定场景下可能会影响兼容性表现。
因此,
为了保证最佳运行环境和稳定性,
最好使用已经适配过的内建 Hudy 包而非直接引用开源版本。

当前EMR系统已升版至6.10.0版本号(6.10.0),同时Hudi平台也已升级至当前稳定版号(0.12.2)。建议选用最新版本系统运行环境(EMR)与数据处理平台(Hudi),因为Hudi平台高版本号系统在稳定性与性能方面均有显著提升;并且在功能完善性方面也有明显优化。具体而言,在并发写支持方面有重要改进:基于FileSystem的锁机制使得在执行Compaction或Clustering操作时更加便捷可靠;此外,在Schema变更管理方面也实现了相应优化:例如,在执行表上Compaction或Clustering操作时可直接配置基于文件系统的锁机制;对于Schema变更相关的数据迁移操作,则可通过配置hoodie.datasource.write.schema.allow.auto.evolution.column.drop参数来实现无需Alter语句即可完成列删除功能;特别地,在动态Schema变更场景下这一操作将变得更加简便易行。

3. 对于 COW 和 MOR 表模式及 Index 的选择,可参考以下指标:

  • Write latency for COW tables exceeds that of MOR tables, with a write amplification efficiency of 100MB and 5 batches completed in (100+500)MB. Before compaction, the storage overhead of MOR is 150MB (100+50)MB.
  • Updating MOR tables is computationally less expensive due to their use of write deltafiles and merge operations.
  • Query performance depends on whether COW tables offer better throughput compared to MOR. Additionally, COW tables are simpler to maintain as they lack deltafile management and compaction logic.
  • Bucket indexes exhibit superior performance under high update rates and large-scale data scenarios. They avoid the pitfalls of Bloom index false positives by using recordkey hash functions to directly locate FileGroup buckets based on their number.
  • Hudi's PartialUpdateAvroPayload implementation addresses the challenge of partial field updates in multi-table real-time joins. This payload design significantly enhances usability by eliminating the need for complex multi-stream join handling.
  • When performance requirements are met, inline compaction for MOR tables is preferred. However, implementing compaction logic requires exclusive access to database locks, which can be cumbersome in certain environments like Hudi version 12. If performance constraints are not met, alternative solutions involving separate compaction programs should be considered due to increased maintenance costs.
  • As a general guideline, a write rate of less than 2 weeks per second with less than 30% update frequency is acceptable for Cow tables. Exceeding these thresholds warrants the use of MOR table structure.
  • Flink Hudi implementation leverages streaming reads to create an end-to-end real-time processing pipeline across the entire data warehouse. While this approach shows theoretical promise, extensive production adoption across various layers remains limited. Current implementations predominantly utilize ODS and DW layers with Hudi primarily serving as a metadata hub for incremental queries. However, it presents challenges when it comes to supporting incremental writes for higher aggregation levels within subsequent data warehouses or when implementing real-time query capabilities at minute-level granularity.

4. Spark 写入 Hudi 过程中的参数配置调整如下做参考:

采用 Spark Structured Streaming 技术进行 Hudi 数据的写入操作。当前采用的触发机制并非持续进行数据 streaming 写入。主要基于成本效益考量,在离线集群上每天定时启动并运行两个作业流程,每个作业的总运行时长约为 1.5 小时,在总量上占用了离线集群资源的约 50%。通过批量处理的方式可以有效降低资源消耗并优化成本控制。

针对那些对实时性有高要求的表单数据集,在进行数据处理时应优先选择基于数据流的写入方式以确保每分钟级别的摄入延迟。

  • 采用COW表结构,在实现写操作时会伴随数据放大现象;然而其结构相比MOR更为简单直接,并无需进行数据压缩过程

  • hoodie.datasource.write.table.type=LCBProtocol

  • Spark SQL 的预设串行度设定为 200,默认情况下会开启较多的串行处理任务。由于在多表数据同步操作中同时处理多个表时会增加处理负担,在这种情况下适当降低其预设值以优化性能是一个合理的选择。具体来说,在同时进行多个表的数据同步操作时,默认设置可能会导致资源闲置的问题。因此,在实际应用中建议根据具体情况将串行度调整为 10 至 20。

  • hoodie.upsert.shuffle.parallelism=20

  • hoodie.insert.shuffle.parallelism=20

  • 小文件控制策略:为了防止过度数据放大, 不会设置过于庞大的参数, 当使用 CON 表时, 在对实时数据摄入要求较高的情况下, 则建议将该参数调低, 需要注意的小问题在于小文件数量可能会增加, 并需定期执行聚类操作.

  • hoodie.parquet.small.file.limit=512MB

  • 该系统采用了 BLOOM 方法来处理索引选择问题

  • 由于在 Hudi 的版本号为 0.10 的阶段中尚未实现一致性的哈希桶数量无法更改

  • 后续将考虑引入 Bucket Index

  • 该设置将全局配置为 Bloom 索引类型

  • 由于需要将DefaultHoodieRecordPayload乱序入湖

  • hoodie.dataSource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload

  • 未划分区域(即未进行分区划分)的主要目的之一是为了简化操作流程

  • 由于无需向非开放业务的Hudi表进行查询
    hoodie.datasource.write.keygenerator.class = \text{org.apache.hudy.keygen.NonpartitionedKeyGenerator}

  • $hoodie.metadata.enable = true

  • 临时可以选择性地跳过对大量时间的数据增量读取,并将 commit 保留设置的数量保持在较低水平。

  • 同时,在 active timeline 中引用对应版本的数据文件时会自动调用 retained 存储的五次提交。

hoodie.cleaner.commits.retained=5

  • 每隔 2 次提交时会对 retained 的状态进行核查一次,并确保其符合预设的要求。

hoodie.clean.max.commits=2

  • 达到 6 次后就会触发 save action,并要求其 minimum value must be greater than the retained value。

    • hoodie.keep.MIN.commits=6
    • hoodie.keep.MAX.commits=7
  • type 被设置为 DIRECT。

  • 没有采用 Timeline-server 基础标记。

  • 当时在编写 S3 代码时遇到了 Markers 不存在的错误,并与社区讨论后暂时采用了 DIRECT 方式。

  • hoodie.write.markers. type=DIRECT

乱序入湖,Hudi ts 字段生成逻辑

为了确保同一记录在 binlog 中多次变更后仍能被正确处理,在 Kafka 上实现数据持久化存储并同步至 Hudi 确保数据一致性的同时

CanalJsonTS:Canal 解析 Binlog 写入 Hudi 的时间戳(毫秒)

KafkaOffset:当前记录所在 Kafka Partition 的 offset 值

CanalJsonArrayIndex:当一条 Kafka 消息中的 Data 数组包含多条数据时,按照索引顺序合并

Kudu+Impala 成本节省

在 IDC 应用场景中, 采用了 16 台高性能服务器来部署 Kudu 服务, 支持公司超过 95% 的业务数据库处理全量及增量数据, 显著提升了业务数据的秒级延迟入库能力;同时支持基于 Impala+Kudu 的实时计算功能, 然而, 在数据存储与应用成本方面存在不足;此外, 后期维护成本较高, 在系统稳定性、扩展性以及并发查询性能等方面也未能达到预期要求;转而采用 Hudi 方案后, 相较于之前系统架构节省了超过75%的硬件投入成本;借助 S3 数据湖平台作为存储基础, 实现了各计算引擎间的自由共享机制;该方案不仅显著缩短了准实时分钟级别延迟的时间间隔, 更能充分释放 S3 数据湖的优势

Spark交互式查询 Kyuubi****

为了使业务方获得统一配置的JDBC服务接口而考虑,在系统中部署了SparkThiefServer以及Kyuubi服务;以下是使用过程中的心得体会:因SparkThiefServer不具备多租户功能而故,在未来计划中将优先采用Kyuubi方案

SparkThriftServer

在实际运行过程中

适当提高 driver-memory 和 spark.driver.maxResultSize 的参数设置,并建议将 spark.sql.thriftServer.incrementalCollect 设置为 true(该参数对结果集的读取效率具有显著影响);

采用 NLB 技术部署三个 SparkThrift 服务实例以实现流量的均衡分配,并在单个节点发生异常时会自动切换到仍然能够正常运行的服务节点。

3. 定时报表任务分散执行;

4. 定期分析日志,优化查询结果集数据量大和引发 OOM 的 TopSQL。

Kyuubi 配置使用经验,权限,资源隔离,配置优化总结

Apache Kyuubi 是一个提供标准化 JDBC 接口的开源项目。它基于Spark引擎实现高性能的数据查询功能。该系统采用水平扩展技术结合负载均衡策略与引擎缓存机制来提升并发处理能力和响应速度。此外配备身份认证服务确保数据安全性适用于ETL流程生成报表以及实时数据分析需求具备高度稳定性和可靠性的同时确保系统安全性

一个 Kyuubi 服务节点典型的参数配置参考下图:

​编辑

伴随最新版本的发布及新增功能的推出,在持续跟进中不断优化各项性能参数

OLAP引擎之 Redshift****

Redshift 作为亚马逊云科技全面托管的云原生物库解决方案,在几分钟内即可构建包含至少数十节点的数据仓库集群,并可灵活配置以适应不同业务需求。它支持高复杂度数据查询语句下的高性能并行查询操作,并通过COPY/UNLOAD命令实现便捷的数据与S3环境之间的交互功能;特别地,在无需本地数据加载的情况下即可执行基于S3上数据的外表查询操作。
该解决方案具备极低的运维成本特征,并结合机器学习算法实现自动优化功能。
在生产环境下的性能表现方面:

  • 50% 的查询响应时间小于10秒
  • 70% 的查询响应时间小于20秒
  • 80% 的查询响应时间小于30秒
  • 90% 的查询响应时间小于60秒
  • 95% 的查询响应时间小于80秒
  • 98% 的查询响应时间小于180秒
  • 99% 的查询响应时间小于360秒
  • 该功能能够支撑突增的并发查询请求,并持续保障高效的查询性能。在排队期间,在瞬态ConcurrentScaling集群上进行包括COPY、INSERT、UPDATE和DELETE等操作的数据写入。当计算资源紧张时,迅速调用现有集群数量几倍增加的数据处理能力来提高速度。

以下是工作日生产环境并发扩展集群使用情况:

  • 扩容的故障问题

在过去的半年中, 团队遭遇了Redshift的一些服务故障, 例如, 在处理天级别规模的任务时进行了定时扩容调度操作出现故障, 并且在集群规模的操作中因超时未能完成而导致回退现象发生.亚马逊云科技的技术支持团队通过内部排查发现了问题并修复了相关的Bug, 并已成功修复问题.

Ranger的权限控制****

该系统提供了四种权限控制方案(EMR 插件与 Ryza 的 metastore 插件等)。最初尝试部署了 EMR 插件以解决集群问题;但因需采用 Kerberos 机制并不具备多主实例支持而放弃这一方案。目前主要采用的是 Ryza 的 metastore 插件(简称 Ryza)。其部署过程简单直观,并且维护较为便捷——一个插件即可覆盖 Hive Spark Trino 和 Presto 等常用数据库;但其在视图权限的管理上存在一定的不足——如果将用户的视图权限开放则需确保其包含的所有表也相应地赋予相应的权限以实现安全策略的一致性;因此决定转向 Ryza 的 Range 操作插件进行测试评估与优化设计。

无需额外选择 EMR Ranger Kerberos。其运行维护费用高昂且架构管理较为繁琐;故障难以快速定位及处理。适用于有一定Kerberos使用基础的用户,并且主要应用于单主系统环境

若Spark、Presto、Trino仅管理表级权限,则可选择使用Metastore插件,并只需部署一个插件即可完成任务;此功能不支持基于Group/Role的策略配置(Policy),但可以通过Policy中新增多于一个用户的设置来实现扩展性需求。

如果需要管理列级别权限,则可以使用Ranger集成 plugins + Kyuubi Ranger plugin。实际上,并非需要安装KyubI而是采用Spark的Ranger插件。最初版本名为Spark-Ranger后被整合到Kyuubi项目中支持Spark 3.x并可独立部署。

最终的成果

基于前期详实的技术调研结果基础上

1. 大数据集群的运行硬件和维护成本降低了 30%;

该集群通过扩展节点的方式实现了算力、存储和网络流量的提升,并优化了其性能。

开发了EMR和Redshift集群以及定时、分钟级多种动态弹性扩缩容方案,并旨在解决业务周期性波动带来的资源压力问题,在提升数据检索速度的同时实现了100%的效率提升

4. 数据架构完成云原生升级与优化,并实现了数据实时归湖与计算分离的技术架构。该架构为后续提供多样化的实时与非实时业务能力支持。如生成式人工智能(GPT)、智能湖仓系统以及动态用户画像分析等应用领域均能受益于此。

5. 大数据中心中台的上线实现了对数据开发、测试、发布及运维流程的规范化管理,并显著提升了40%的开发效率。通过引入数据分析功能如数据地图和数据分析血缘等模块的应用,企业能够更高效地进行数据分析工作。此外,在这一过程中形成的统一的数据资产管理体系还显著降低了企业间协作所需的数据沟通成本。

未来的展望和规划

**实时计算 **& 准实时湖仓一体

随着 AIGC 技术快速增长并得到广泛应用,未来数据的增长趋势将呈现实时、高质量及大规模的特点.为了有效应对这一趋势,要求建设准 realtime 数据湖仓,从而实现对数据进行实时采集,并在数据抵达仅仅几秒钟至几分钟的时间内完成处理与存储,从而使系统能够迅速提取有价值的信息进而有效响应当前业务环境下的实时决策需求.

Serverless做为现有架构未来演进方向****

Amazon EMR 和 Redshift 的 Serverless 技术能够为企业提供更具灵活性、高效率的同时实现低成本与可靠性相结合的数据处理方案。采用无服务模式后企业可以根据实际需求按需支付费用从而有效降低服务器运行及维护的成本并且显著缩短部署周期。基于动态的工作负载调整计算资源规模使得系统能够更好地适应业务需求的变化。随着国内亚马逊云科技对 Serverless 技术的支持逐步到位在线即用与弹性伸缩节点的部署模式逐渐被Serverless架构取代以进一步优化成本控制并满足日益增长的企业级数据处理需求

尝试 Amazon EMR Auto Scaling

现有生产环境中的弹性伸缩策略主要依赖于基于经验的定时任务机制。然而,这种策略通常缺乏灵活性和及时性响应,在某些突发的业务高峰期需要频繁的人工干预来调节资源分配。为了平衡弹性伸缩与稳定性之间的关系,在保证系统稳定性的前提下,考虑启用EMR自带的自动扩展功能以提升集群的整体弹性。通过这一改进措施,不仅能够避免因资源过度或不足分配而导致的浪费问题,并有效降低集群管理所需的人力和时间成本。

全部评论 (0)

还没有任何评论哟~