Advertisement

都2024年了!是谁还不会优化 Hive 的小文件啊!!!速看!

阅读量:

文章目录

    • 小文件产生的原因

      • 1.查询建表或者插入
      • 2.装载数据
      • 3.动态分区
    • 小文件影响

    • 解决方法

      • 针对已经存在的小文件进行优化
        • 1.小文件归档
    • 2.getmerge

    • 3.concatenate

    • 4.重写

      • 针对写入数据时的优化
        • 1.调参优化
    • 2.动态分区优化

    • 3.使用 Spark 算子控制小文件数量

查看 HDFS 上的文件时,无意间点进了 Hive 表的存储目录下,打开发现其中有许多的小文件,如下所示:
在这里插入图片描述

每个文件都是几个 KB,都占用了一个块,这种就是典型的小文件。那么通过这篇博客,一起来学习如何解决 Hive 中出现的小文件问题。

注意,博主使用的 Hive 版本为3.1.3,不同版本之间可能存在微差,但整体影响不大。

小文件产生的原因

产生小文件绝大多数都是和 Reduce 相关的,因为它决定了我们最终的输出文件数量,主要有以下几个场景:

1.查询建表或者插入

当我们通过查询建表或者通过查询的方式将数据插入的时候,就有可能会产生小文件,如下所示:

复制代码
    create table test_a select * from tmp;
    
    insert into test_a select * from tmp;

2.装载数据

使用 load 语句装载或者 insert 语句直接将数据装载到表中,也可能会产生小文件。

当使用 load 时,导入多少个文件,在 Hive 表中就会生成多少个文件:

复制代码
    load data local inpath 'xxx' overwrite into table test_a;

当使用 insert 直接插入数据时,它会启动 MR 任务,有多少个 Reduce,就会输出多少个文件:

复制代码
    insert into test_a values (1,'a'),(2,'b'),(3,'c');

3.动态分区

在 Hive 中使用动态分区时,容易产生大量的小文件。

这主要是由于动态分区插入数据的方式导致的,在每次插入数据时,Hive 可能会为每个分区创建一个新的文件。如果插入的数据量较小或者插入操作频繁,就会导致产生大量的小文件。

小文件过多会对Hive和底层存储HDFS产生负面影响。此外,小文件过多也会使得NameNode的元数据变得庞大,占用过多内存,1。

小文件影响

我们都知道在 HDFS 中,所有文件的元数据信息都存储在 NameNode,也就是命名空间中,它运行在有限的内存里。

HDFS 上每个文件的元数据信息占用 150B 左右的空间,一旦小文件过多,就会影响 HDFS 的性能,还可能撑爆 NameNode 的内存,造成集群宕机,无法提供服务,这就是为什么要处理小文件的根本原因。

同时,对于 Hive 来说,每个小文件在查询时都会被当作一个块,并启动一个 Map 任务来完成,但这种情况下 Map 任务的启动和初始化时间通常远大于逻辑处理时间,这样就会导致大量的资源浪费,降低程序性能。

解决方法

针对已经存在的小文件进行优化

1.小文件归档

在 Hadoop 中,提供了一种小文件归档技术,它可以将一个目录下的所有小文件都打包成一个 HAR 文件,也就是说,它只对分区表有效

复制代码
    -- 是否开启归档操作,默认 false
    set hive.archive.enabled=true;
    -- 是否允许Hive在创建归档时可以设置父目录(该参数有些Hive版本中已经弃用,无需设置)
    set hive.archive.har.parentdir.settable=true;
    -- 控制需要归档文件的大小
    set har.partfile.size=1099511627776;
    
    -- 使用语法
    ALTER TABLE table_name ARCHIVE PARTITION (partition_col = partition_col_value, partition_col = partiton_col_value, ...)
    
    -- 归档示例
    ALTER TABLE test_a ARCHIVE PARTITION(dt='2024-01-01');
    
    -- 解档示例
    ALTER TABLE test_a UNARCHIVE PARTITION(dt='2024-01-01');

归档之后只能对该表进行查询操作,不支持写入,且查询效率会降低,因为每次都需要 HAR 中读取数据,造成额外的开销。

使用归档操作时,可能会发生报错,如下所示:
在这里插入图片描述

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org/apache/hadoop/tools/HadoopArchives

通过 Hive 的运行日志找出详细原因(日志位于 /tmp/当前用户/hive.log 中):
在这里插入图片描述
从日志中可以看出,Hive 在执行归档操作时,发现找不到这个类,很显然,是因为没有 JAR 包造成的。

我们需要将 Hadoop 的归档操作包复制到 Hive 的 lib 目录下:

复制代码
    cp $HADOOP_HOME/share/hadoop/tools/lib/hadoop-archive*.jar $HIVE_HOME/lib

重启 Hive 元数据服务 ,再次执行归档操作:
在这里插入图片描述

没有出现问题,归档完成。

归当前
在这里插入图片描述

归档后

归档后,会有形成一个目录 data.har,目录里面存储的就是归档的数据以及信息。
在这里插入图片描述

如果你尝试往已经进行归档的表中写入数据时,会直接报错:
在这里插入图片描述

但可以查询:
在这里插入图片描述

执行解档后,它会将数据以及目录结构恢复成归档前的样子,像无事发生一样。
在这里插入图片描述

2.getmerge

在 Hadoop 中,提供了一个专用于合并 txt 格式的小文件命令 getmerge

先通过 getmerge 命令将小文件合并到本地路径中(操作用户需要有该目录的权限),然后将旧文件删除,上传合并后的文件,完成数据加载。

操作示例:
在这里插入图片描述

复制代码
    # 合并小文件到本地
    hadoop fs -getmerge  /user/hive/warehouse/skew_db.db/test_a/dt=2024-01-01/* /opt/module/test/test_a/2024-01-01

在这里插入图片描述
可以看到它将小文件合并到了指定的本地路径中。

复制代码
    # 删除 HDFS 上的旧文件
    hadoop fs -rm  /user/hive/warehouse/skew_db.db/test_a/dt=2024-01-01/*
    
    # 上传合并完成的文件
    hadoop fs -put /opt/module/test/test_a/2024-01-01 /user/hive/warehouse/skew_db.db/test_a/dt=2024-01-01

数据上传完成后,直接进入 Hive 查询即可:
在这里插入图片描述

3.concatenate

在 Hive 中,自带了一个合并小文件的命令 concatenate,但它有一定局限性,只能用于 ORC 和 RCFILE 格式的表。使用该命令时,可能无法一次性将所有小文件都合并完成,需要执行多次。

这里以 ORC 格式的表进行举例,建表示例如下:

复制代码
    CREATE TABLE IF NOT EXISTS test_a (
    key string,
    value int
    ) ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t'
    LINES TERMINATED BY '\n'
    STORED AS ORC;

我通过 insert 向测试表中插入了大量的数据,共形成 33 个小文件,每个都在 2KB 左右,如下所示:
在这里插入图片描述

现在通过 concatenate 命令来对小文件进行合并操作:

复制代码
    # 非分区表
    alter table test_a concatenate;
    
    # 分区表
    alter table test_a partition(dt=xxx) concatenate;

我创建的是非分区表,合并过程如下:
在这里插入图片描述

它会启动一个 MR 任务去完成小合并操作,合并完成后,如下所示:
在这里插入图片描述

减少了 10 个小文件,但并没有全部合并完,我们可以再次执行该命令,以达到合并效果。

第二次执行后,如下所示:
在这里插入图片描述

可以看到,已经将全部的小文件都合并成功了。

4.重写

对于已经存在的小文件,还可以通过重写的方式对表进行覆盖写操作。

可能我们之前在往表内写入数据时,进行了 Group By、Join 等操作,对数据进行了 Shuffle 操作,产生了许多的 Reduce,所以在写入数据后产生了许多小文件。

我们可以直接对表进行查询重写操作,这样就不会产生任何 Shuffle 操作,从而就不会出现小文件。

重写前
在这里插入图片描述
重写后

复制代码
    -- 重写示例语句
    insert overwrite table test_a select * from test_a;
在这里插入图片描述

可以看到,重写操作已经成功的合并了小文件,解决的小文件问题。

针对写入数据时的优化

许多时候,我们更多的可以从源头上避免产生小文件,减少后期的优化维护。

1.调参优化

调整 Map 端、Reduce 端以及其它的参数,进行优化。

注意,下面部分参数的默认值会随着 Hive 版本的变化而产生差异!

Map 端

复制代码
    -- 让Hive在读取数据时进行一定程度的合并,减少Map任务的数量,默认开启
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    
    -- 控制每个MapTask的切片大小,默认256M(决定合并后文件的数量)
    set mapred.max.split.size=256000000;   -- 256M
    
    -- 指定每个节点上MapReduce任务的最小分割大小,默认1B。(决定了多个DataNode上的文件是否需要合并)
    set mapred.min.split.size.per.node=134217728;  -- 128M
    
    -- 指定不同机架下每个MapReduce任务的最小分割大小,默认1B。(决定了不同机架上的文件是否需要合并)
    set mapred.min.split.size.per.rack=134217728;  -- 128M

Reduce 端

复制代码
    -- 指定Reduce的数量,默认为-1,表示让Hadoop根据作业的输入数据大小自动决定Reduce任务的数量
    -- 如果指定数量,则表示固定Reduce的个数
    set mapreduce.job.reduces=10;
    
    -- 指定每个Reduce的大小,由Hive根据大小自动判断出需要的Reduce个数,默认256MB
    set hive.exec.reducers.bytes.per.reducer=256000000;

Spark

复制代码
    -- 使用HiveOnSpark时,是否自动合并小文件,默认false
    set hive.merge.sparkfiles=true;

联合设置

复制代码
    -- 指定Map端输出是否进行合并,默认为true
    set hive.merge.mapfiles=true;
    
    -- 指定Reduce端输出是否进行合并,默认为false
    set hive.merge.mapredfiles=true;
    
    -- 指定合并后文件的大小,默认256M
    set hive.merge.size.per.task=256000000; 
    
    -- 设置小文件合并的阈值,默认16M,当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件合并操作
    set hive.merge.smallfiles.avgsize=16000000;
    
    -- 控制Hive是否启用输出结果的压缩,默认false。虽然启用压缩可以节省存储空间,并可能在网络传输时提高效率,但也会增加CPU的开销
    set hive.exec.compress.output=true;
    
    -- 控制输出文件是否应该被压缩,默认false
    set mapreduce.output.fileoutputformat.compress=true;
    
    -- 指定压缩编码器,如果只开启压缩不指定编码格式,则没有效果。默认org.apache.hadoop.io.compress.DefaultCodec
    set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
2.动态分区优化

如果在生产场景中需要用到动态分区,那么又该如何预防小文件呢?

下面直接通过一个案例来进行讲解,首先,启动 Hive 非严格模式,开启动态分区:

复制代码
    set hive.exec.dynamic.partition=true; 
    set hive.exec.dynamic.partition.mode=nonstrict;
    -- 设置每个节点最大能创建的分区数量,默认100个
    set hive.exec.max.dynamic.partitions.pernode=500;

我这里共创建了 18 个分区,其中每个分区都包含小文件,如下所示:
在这里插入图片描述

此时,我们可以使用 distribute by 进行优化,它是一种局部分区,我们将动态分区键同时也作为 distribute by 的分区键,那么就能够将相同的分区键数据分配到同一个 Reduce 中进行处理,从而避免产生小文件。

如下所示:

复制代码
    -- 读取动态分区中现有表的数据,进行优化
    insert overwrite table test_a partition(key) select value,key from test_a distribute by key;
在这里插入图片描述

如果想要控制每个分区中生成的文件数量 ,则可以使用 distribute by cast(rand()*N as int) 来进行表示,其中 N 表示生成文件的数量。

该方法受到参数 set mapreduce.job.reduces=-1; 的影响,如果你想要精准控制生成指定数量的文件,则该参数的值必须设置大于或等于 N*2(这是博主本人经过多次尝试之后找出来的规律!),否则可能无法做到控制每个分区生成的文件数量,因为它可能会受到 Hive 自动优化的影响,自动减少 Reduce 数量,从而无法控制生成文件的数量。

复制代码
    set mapreduce.job.reduces=10;
    
    insert overwrite table test_a partition(key) select value,key from test_a distribute by cast(rand()*3 as int);

我这里设置成 cast(rand()*3 as int) 表示希望每个分区目录下都只生成 3 个文件。

即使这里将 Reduce 的个数指定为 10,最终也只会有 3 个 Reduce 任务在跑,因为我们通过distribute by 限制了分发到 Reduce 中的数据分布,剩下的 Reduce 都在空跑。

合并前
在这里插入图片描述

运行完成后
在这里插入图片描述

可以看到,成功合并到了指定的文件数量(每个分区目录下均为 3 个),每个文件中的数据量都十分均匀。

通过查看 Spark Job 运行记录可以发现,共启动了 10 个 ReduceTask,但只有 3 个真正在运行,其余的都在空跑:
在这里插入图片描述

这是因为 rand() 默认生成 0~1 之间的任意随机数,通过随机数对数据进行划分,每次都是随机分配的,所以每次分配到 Reduce 上的数据都很均匀,我们就能够通过设置基数来控制最终每个分区目录下生成的文件数量。

3.使用 Spark 算子控制小文件数量

当我们在使用 Spark 时,可以通过 repartition 或者 coalesce 算子控制分区,从而控制小文件的数量。

应用场景

当你需要增加或减少分区数,并且不关心是否涉及 Shuffle 时,则可以使用 repartition 算子。

当你只需要减少分区数,并且希望避免不必要的 Shuffle 时,则可以使用 coalesce 算子,但该算子可能会造成 OOM。

我当前在 Hive 中存在 test_a 表,其中存在 33 小文件,如下:
在这里插入图片描述

现在通过 Spark 算子来进行重写,合并小文件:

复制代码
    spark.sql("select * from skew_db.test_a").repartition(1).createTempView("test_a_merge")
        
    spark.sql("insert overwrite table skew_db.test_a select * from test_a_merge")

重新刷写后如下所示:
在这里插入图片描述

如果你使用的表格式为 ORC 时,在重写时可能会报如下错误:

spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.

这是因为 Spark 内置的 ORC 解析器和 Hive 中的不同,它不支持递归子目录的参数,所以会报错,这里将其禁用,使用 Hive 的解析器来执行,通过如下参数进行调整:

复制代码
    spark.sql("set spark.sql.hive.convertMetastoreOrc=false")

使用 coalesce 算子也是一样的效果,但是要注意这两个算子的应用场景。

复制代码
    spark.sql("select * from skew_db.test_a").coalesce(1).createTempView("test_a_merge")
    spark.sql("insert overwrite table skew_db.test_a select * from test_a_merge")
在这里插入图片描述

全部评论 (0)

还没有任何评论哟~