写hive到clickhouse的脚本遇到的问题及解决办法
文章目录
- 背景
- 步骤
背景
最近有一个需求就是要将hive数据库中的数据导入到clickhouse中。目前的方法是先使用waterdrop工具将hive数据库的数据导入到clickhouse的本地表中随后再进行清洗处理并将数据写入到分布式表中。手动操作已经完全足够满足需求但希望将其转变为定时任务调度由于我们这边的大数据集群与clickhouse系统是分开部署的因此需要通过远程方式执行clickhouse命令下面记录了我在编写脚本过程中遇到的问题及解决方案。
步骤
在构建水滴架构时,在系统中配置相应的参数并完成环境调试后,在项目管理界面中依次执行以下操作:启动水滴服务并设置关联的数据库配置项;随后将待处理的数据集合按照指定的格式导入到ClickHouse数据库中;具体操作步骤可参考<>这一参考资料获取详细指导
在梳理编写脚本的心路历程……原谅我对Linux的理解尚浅。在工作中最常遇到的问题之一就是希望能够方便地执行一些远程操作。最直接想到的工具就是使用SSH协议连接到ClickHouse服务器进行操作。为了方便后续操作,运维团队帮助我们开通了ClickHouse集群上的免密认证权限。这一权限仅限于我们当前需要使用的服务器。
我写的脚本主要分为以下几个步骤:
- 允许传递日期参数是必要的前提条件之一, 因为本系统采用的是基于日区间的Hive数据源进行处理。
- 在导入操作前, 必须确保清除本地存储的Hive数据, 这是为了防止潜在的失败情况导致本地数据库出现重复记录。
- 通过WaterDrop工具将Hive数据同步至ClickHouse数据库, 这一过程确保了数据在不同存储层之间的高效传输。
- 在数据清洗流程结束后, 必须以分布式存储架构写入目标表, 并在整个系统中验证一致性信息。
- 为了保证系统的稳定运行, 在每次操作完成后都需要清除本地存储的Hive数据。
第一步很简单,脚本如下:
# 日期及参数配置输入,默认当天无参数输入
if [ $# == 0 ];then
daydate=`date +%Y-%m-%d`
elif [ $# == 1 ];then
daydate=$1
else
echo "输入参数出错!"
exit 1
fi
第二步由于Sql语句相对简单易懂,并没有遇到什么困难的问题;通过简单的SSH远程命令即可完成
ssh root@xxx.xxx.x.xxx 'clickhouse-client -u default --password m8yjvWQ+ --query="truncate table test.bi_player2"'
因为ssh命令也采用结合使用单双引号的方式操作,并非单独使用单一或双引号;只需确保与ClickHouse命令中的引号格式不冲突即可。
第三阶段,在同步数据时可直接调用waterdrop服务。然而由于waterdrop平台的配置文件无法直接传递参数解决方案可参考水滴配置文件动态赋值一文中的相关内容
脚本如下:
cat>/opt/waterdrop-1.5.1/job/ods_bi_player2.conf<<!EOF
spark {
spark.sql.catalogImplementation = "hive"
spark.app.name = "hive2clickhosue_player2"
spark.executor.instances = 30
spark.executor.cores = 1
spark.executor.memory = "2g"
}
input {
hive {
pre_sql = "select user_id,level,country,vip,rmb_diamonds,all_diamonds,money,wood,forage,mineral,iron,feats,card_soul,corps_money,treasure_money,official,building,castle_lv,genius,m_power,guide_step,record_time,corps_uid,horseshoe_gold,bi_sid,gather_time,if(record_time is null,dateline,from_unixtime(record_time,'yyyy-MM-dd')) as dateline,bi_pid from test.ods_bi_player2 where dateline = '$daydate'"
table_name = "bi_player2"
}
}
filter {}
output {
clickhouse {
host = "xxx.xxx.xx.xxx:8123"
database = "test"
table = "bi_player2"
fields = ["user_id","level","country","vip","rmb_diamonds","all_diamonds","money","wood","forage","mineral","iron","feats","card_soul","corps_money","treasure_money","official","building","castle_lv","genius","m_power","guide_step","record_time","corps_uid","horseshoe_gold","bi_sid","gather_time","dateline","bi_pid"]
username = "default"
password = "m8yjvWQ+"
bulk_size = 100000
}
}
!EOF
# 执行waterdrop任务 把hive的数据写入clickhouse
/opt/waterdrop-1.5.1/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/waterdrop-1.5.1/job/bi_player2.conf
第四步 数据清洗,写入分布式表
从之前的分析来看第四步似乎并没有什么问题显得很简单。
因为ClickHouse分布式表的数据量较大 所以我选择了按日期进行分区。
用于删除ClickHouse分区的命令如下:
alter table xxx drop partition "2018-08-08"
客户端(非交互式)命令:clickhouse-client -u default --password m8yjvWQ+ --query="alter table xxx drop partition '2017-12-31'"
那么问题是这样的 命令中包含有双引号 如果与SSH一起执行可能会导致错误 需要注意如何处理这个问题。
思考了一下之后发现:由于命令无法被执行,则需要编写并运行一个shell脚本,在clickhouse服务器上创建并运行一个shell脚本,并将所有需要执行的SQL语句传递进去。然后直接提交给clickhouse数据库进行处理。具体操作步骤如下:
随后,在clickhouse数据库中编写了一个用于shell操作的程序设计较为简单的脚本程序
其中变量$script_name表示用于执行特定操作的命令行参数
#!/bin/bash
# clickhouse执行sql 供远程调用
clickhouse-client -u default --password m8yjvWQ+ --query="$*"
备注:$* 就可以获取到所有的参数形成一个字符串,中间是通过空格分开的。
- ssh远程调用,例如清空分布式表数据
# 清空分布式表的数据
sql="alter table test.bi_player2 on cluster yk_ck_cluster drop partition \'$daydate\'"
ssh root@192.168.0.198 /home/test/ck_query_sql.sh $sql
备注:特殊字符需要转义
第五步就和第四步一样的了
整个脚本如下:
#!/bin/bash
#执行之前需要source一下环境变量
source /opt/Bigdata/client/bigdata_env
# 日期及参数配置输入,默认当天无参数输入
if [ $# == 0 ];then
daydate=`date +%Y-%m-%d`
elif [ $# == 1 ];then
daydate=$1
else
echo "输入参数出错!"
exit 1
fi
echo "当日:$daydate"
# 远程清空ck-0001的本地表
sql="truncate table test.bi_player2"
ssh root@xxx.xxx.xx.xxx /home/test/ck_query_sql.sh $sql
# 打印数据传输脚本并赋值
cat>/opt/waterdrop-1.5.1/job/bi_player2.conf<<!EOF
spark {
spark.sql.catalogImplementation = "hive"
spark.app.name = "hive2clickhosue_player2"
spark.executor.instances = 30
spark.executor.cores = 1
spark.executor.memory = "2g"
}
input {
hive {
pre_sql = "select user_id,level,country,vip,rmb_diamonds,all_diamonds,money,wood,forage,mineral,iron,feats,card_soul,corps_money,treasure_money,official,building,castle_lv,genius,m_power,guide_step,record_time,corps_uid,horseshoe_gold,bi_sid,gather_time,if(record_time is null,dateline,from_unixtime(record_time,'yyyy-MM-dd')) as dateline,bi_pid from test.bi_player2 where dateline = '$daydate'"
table_name = "bi_player2"
}
}
filter {}
output {
clickhouse {
host = "xxx.xxx.xx.xxx:8123"
database = "test"
table = "bi_player2"
fields = ["user_id","level","country","vip","rmb_diamonds","all_diamonds","money","wood","forage","mineral","iron","feats","card_soul","corps_money","treasure_money","official","building","castle_lv","genius","m_power","guide_step","record_time","corps_uid","horseshoe_gold","bi_sid","gather_time","dateline","bi_pid"]
username = "default"
password = "m8yjvWQ+"
bulk_size = 100000
}
}
!EOF
# 执行waterdrop任务 把hive的数据写入clickhouse
/opt/waterdrop-1.5.1/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/waterdrop-1.5.1/job/bi_player2.conf
# 清空分布式表的数据 单引号需要转义
sql="alter table test.bi_player2 on cluster yk_ck_cluster drop partition \'$daydate\'"
ssh root@xxx.xxx.xx.xxx /home/test/ck_query_sql.sh $sql
# 远程执行ck的命令 把数据写入到分布式表中 *号需要转义
sql="insert into test.bi_player2_all select \* from test.bi_player2"
ssh root@xxx.xxx.xx.xxx /home/test/ck_query_sql.sh $sql
# 清空本地表的数据
sql="truncate table test.bi_player2"
ssh root@xxx.xxx.xx.xxx /home/test/ck_query_sql.sh $sql
参考如下:
ssh 远程执行命令
