那些年我们在spark SQL上踩过的坑
潜心钻研延云YDB整整一年后回头一看这一年的 spark 开发之路充满了挑战与教训
这一年在使用 spark 的过程中真心遇到了不少困难
spark 内存泄露
1.高并发情况下的内存泄露的具体表现
令人Regretful的是(spark的设计架构)并非专为处理高并发请求而构建。
在实验中将集群设置为低网络质量环境,并提交了100次并发查询请求,在持续运行约3天后发现系统内存出现泄漏。
在对小规模SQL查询的压力测试过程中进行了观察,并发现了较多数量的activejob停留在spark UI界面中的等待执行的状态,并且这些activejob持续运行中。如图所示

b)并且发现driver内存爆满

c)用内存分析分析工具分析了下

2.高并发下AsynchronousListenerBus引起的WEB UI的内存泄露
SPARK在短时间内生成了大量的SQL,并且这些SQL中包含了大量的联合和连接操作,在执行过程中将导致大量事件对象被创建,并将使得这里的事件数量激增到10000个以上。
当事件数量达到10,000时开始丢弃 event用于回收资源;资源被丢弃后就无法回收了。 因其涉及UI页面的问题而取消了队列长度限制。


3.AsynchronousListenerBus本身引起的内存泄露
抓包发现


这些event是通过post方法传递的,并写入到队列里

但是也是由一个单线程进行postToAll的

在高并发场景下,在处理大量请求时,“$``single-threaded postToAll操作的速度较后端$``post服务稍慢”。这会导致事件排队数量不断增加,“ if $ $ 持续性的高强度SQL查询被提交进来 ` `”,这将引发内存泄漏问题。
接下来我们在深入研究一下postToAll的方法时发现,在这条路径上运行缓慢的原因主要与事件处理相关的逻辑有关

您难以想象,在借助jstack进行抓取与分析后发现,程序长时间停滞于日志记录页面
可以通过禁用这个地方的log来提升event的速度
log4j.logger.org.apache.spark.scheduler=ERROR

4.高并发下的Cleaner的内存泄露
说到此处,Cleaner的设计方案可以说是spark中最糟糕的一个设计。 spark中的ContextCleaner主要用于回收与清理已经完成的 Broadcast 对象及 shuffle 数据。然而,在高并发场景下,我们发现这个问题会导致大量数据堆积,最终将导致 driver 内存耗尽而崩溃
l我们先看下,是如何触发内存回收的

确实就是利用System.gc()回收的内存。如果我们在JVM里设置了禁止执行System.gc的话,则这个逻辑就会变得无效(通常建议禁用system.gc)
lclean过程
该逻辑属于单线程模式,并且每个阶段都需要多台机器协作才能完成。其运行效率较低,在SQL事务并发度较高时生成的速度超过了系统的处理能力。这将导致该driver出现内存泄漏问题。若广播任务占用过多系统资源,则会导致大量使用本地磁盘的小文件。在实际测试中观察到长时间高强度并行工作状态下本地磁盘用于存储blockmanager目录的数据占据了约60%的空间。

我们再来分析下 clean里面,那个逻辑最慢

真正存在的性能瓶颈在于 blockManagerMaster 中的 removeBroadcast 由于该逻辑涉及跨机器通信
针对这种问题,
我们增加了在SQL层上的一个名为SQLWAITING的功能模块,并对堆积长度进行了检测或评估。当累积到某个设定阈值时(即达到或超过我们的设定值),从而阻止新进入的SQL语句执行以避免潜在的问题)。配置文件路径为$conf/ya100_env_default.sh中的ydb.sql.waiting.queue.size字段可被修改以调节该阈值的具体大小

l建议集群网络带宽设置得更大一些,在提升性能方面具有明显优势;相比于千兆网络,在处理数据流量时万兆网络的表现更为高效
l给集群休息的机会,不要一直持续性的高并发,让集群有间断的机会。
增加 spark 的线程池数量,并通过修改 spark-defaults.conf 中的某些参数来优化以提高性能

5.线程池与threadlocal引起的内存泄露
发现 spark、hive 和 lucene 都非常偏爱通过 threadlocal 管理临时 session 对象,并希望在 SQL 执行完成后这些对象能够自然释放。然而 spark 同时采用了线程池,在其内部的线程却始终无法终止运行。这导致相关资源长时间无法释放并持续占用内存。
为了解决这一问题,延云对 spark 关键线程池的实现进行了优化,并将其设置为每小时自动更换新的线程池。旧有的线程数量会自然地被释放。
6.文件泄露
您会注意到当会话数量增多时 spark 会在 HDFS 和本地硬盘上创建了大量的磁盘目录 最终会导致本地硬盘与 HDFS 目录过载而使整个文件系统的运行出现严重问题 为了应对这一挑战 在 YDB 中我们已经实施了相应的解决方案
7.deleteONExit内存泄露


为什么会有这些对象在里面,我们看下源码


8.JDO内存泄露
多达10万多个JDOPersistenceManager





9.listerner内存泄露
通过对spark集群的日志进行详细分析和研究,在实际应用中发现其事件监听功能(listener)在随着运行时间的增长而逐渐减缓。
发现所有代码都卡在了onpostevent上

jstack的结果如下

深入研究了调用逻辑的工作流程后发现其操作模式为反复调用特定组件其中每个组件在无内容执行时才会触发生成上述jstack屏幕截图

通过内存发现有30多万个linterner在里面

发现都是大多数都是同一个listener,我们核对下该处源码

最终定位问题
确系所在位置存在一个BUG问题, 每当启动JDBC连接时, spark系统会自动添加一个listener. 随着时间推移, listener的数量会不断增加. 为此, 我仅需修改少量代码即可解决问题. 随后将进行下一步的压力测试.

二十二、spark源码调优
实验证明即使数据量极少 采用Spark技术的一次SQL操作均需花费一秒的时间 对于大量实时查询而言 这一等待过程会给用户体验带来显著的困扰 为此 我们深入研究并针对性地优化了Spark和Hive的具体代码实现 在这一改进方案下 响应时间得以压缩至200-300毫秒之间
以下是我们改动过的地方
1.SessionState 的创建目录 占用较多的时间

另外,在使用Hadoop namenode HA时,在某些情况下需要注意以下几点:当第一个namenode处于standby状态时,在某些情况下可能会变慢,并不只是一秒;因此,在改动源码之前,请确保active节点排在前面以避免潜在的问题。
2.HiveConf的初始化过程占用太多时间
经常性的hiveConf配置初始化过程必须解析核心配置文件:core-default.xml,hdfs-default.xml,yarn-default.xml
MapReduce默认配置文件和Hive默认配置文件等多个XML文件, 而这些XML文件是嵌入到JAR包中的.
第一部分,在解压 jar 包时会占用较多的时间;第二部分,在每次解析 XML 文件时也会占用较长的时间。



3.广播broadcast传递的hadoop configuration序列化很耗时
序列化的实现采用了压缩策略来优化过程,并存在全局锁的风险
在每次序列化过程中, lconfiguration发送了过多的不必要的配置项,数百个配置参数中占用了约60KB的空间.经过优化后去除了冗余配置项,最终将所需传输的关键配置参数减少至仅44个,仅占用约2KB的空间.


4.对spark广播数据broadcast的Cleaner的改进
由于SPARK-3015 的BUG,spark的cleaner 目前为单线程回收模式。
大家留意spark源码注释

其中的性能瓶颈点主要体现在广播数据的清理逻辑上。因为需要跨越多台机器完成任务分配与数据同步工作流程,借助akka协议实现网络交互。
如果回收并发规模显著增大,则 SPARK-3015 的 bug 报告将导致网络拥塞,并将引发大量 timeout 事件发生。
回收量突然大幅增加的原因是什么? 其实是因为cleaner的核心机制按照固定周期运行,在默认设置下持续30秒或进行GC后才会启动。这使得短时间内产生大量akka进程同时运行,并集中释放内存资源。因此网络性能会出现明显下降的情况自然就不难理解了。
但是,在单线程回收模式下意味着recycling speed remains constant. If query concurrency becomes extremely high, the recycling speed will not be able to keep up with the cleaner’s pace, resulting in an accumulation of cleaners and causing a process to enter an Out Of Memory (OM) state (YDB has implemented corresponding optimizations to limit the concurrency of foreground queries).
无论是在OOM场景还是在处理能力受限的情况下
对于官方机构提出的这一做法, 我们认为它并不是一个完美无缺的解决方案. 必须支持并发回收机制, 只要能够解决Akka出现的时间超限问题即可. 因此我们需要深入分析这一问题. 为什么会出现这种情况? 由于清洁器占用过多资源导致了该现象. 那么我们是否可以优化设置以减少清洁器的同时处理请求数量呢? 比如说将清洁器的最大并发数设置为4个, 而不是默认全部分配满载. 这样既能提高清洁器回收效率, 又能从根本上解决问题并非是更好的选择.
针对这一问题,在经过一系列分析和权衡后
