CDH安装ELK及简单应用
安装elasticsearch
CDH中没有官方提供Elastic的parce包,但是问题不大,CDH提供了一个编译工具可以自行打parcel包。对于其他的分布式工具大部分都可以通过这种方式进行CDH安装,由CDH进行统一的分配、配置管理。既然有CDH这个管理工具,当然就尽量把所有组件都进行统一管理了。
安装git、maven
yum install git
git version
wget https://apache.website-solution.net/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
tar -xzvf apache-maven-3.6.3-bin.tar.gz
vim /etc/profile
#这里写你自己的maven解压目录
export MAVEN_HOME=/usr/maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile
mvn -version
安装cloudera manager编译工具
mkdir -p github/cloudera
cd github/cloudera/
//克隆安装工具git文件
git clone https://github.com/cloudera/cm_ext.git
cd cm_ext
//加载项目中的依赖包
mvn package
mvean的依赖比较多,需要多等一段时间,如果有发现长时间停滞可以ctrl+C中断然后重新加载。下载elasticsearch的tar.gz安装包,官网有不同版本的,选择一个合适的版本。
封装parcel包
git克隆一个elasticsearch的打包工具
git clone https://github.com/ibagomel/elasticsearch-parcel.git
cd elasticsearch-parcel
//使用工具对tar.gz文件进行打包操作
POINT_VERSION=5 VALIDATOR_DIR=/root/github/cloudera/cm_ext OS_VER=el7 PARCEL_NAME=ElasticSearch ./build-parcel.sh /root/github/cloudera/elasticsearch/elasticsearch-6.3.1.tar.gz
//进行CSD校验
VALIDATOR_DIR=/root/github/cloudera/cm_ext CSD_NAME=ElasticSearch ./build-csd.sh
//进入目录查看parcel文件
cd build-parcel/
ls
搭建本地库
主要是就是利用httpd搭建一个本地库提供parcle包,然后把csd文件拷贝到cloudera目录中。
mkdir /var/www/html/elastic-parcel
cd /root/github/cloudera/elasticsearch-parcel/build-parcel/
cp ELASTICSEARCH-0.0.5.elasticsearch.p0.5-el7.parcel /var/www/html/elastic-parcel/
cp manifest.json /var/www/html/elastic-parcel
cd /root/github/cloudera/elasticsearch-parcel/build-csd
cp ELASTICSEARCH-1.0.jar /opt/cloudera/csd/
//CDH容易报一个无权限访问目录的错误,提前给足够的权限
chmod 777 /opt/cloudera/parcels/ELASTICSEARCH-0.0.5.elasticsearch.p0.5/config/
然后重启clouderamanager,即可在CDH管理WEB上加载新的parcel。
/opt/cloudera-manager/etc/init.d/cloudera-scm-server restart
安装kibana
kibana并不是分布式的组件,不需要通过CDH统一管理。
上传解压配置启动即可。
数据可视化
数据处理
原有字段中包含year、month、day、hour和其他数值数据,在elasticsearch中,以date格式存储会更方便数据的使用。elasticsearch可以将"yyyy-MM-dd"格式的字符串自动解析成date格式,所以这里需要将year、month、day三个字段拼接字符串,并且group聚合。
CREATE TABLE pekingweather AS SELECT
MIN(`no`) AS `no`,
CONCAT_WS(
'-',
`year`,
LPAD(`month`, 2, 0),
LPAD(`day`, 2, 0)
) AS date,
id,
ROUND(AVG(co), 2) AS co,
ROUND(AVG(pm25), 2) AS pm25,
ROUND(AVG(so2), 2) AS so2,
ROUND(AVG(no2), 2) AS no2,
ROUND(AVG(pm10), 2) AS pm10,
ROUND(AVG(o3), 2) AS o3,
ROUND(AVG(temp), 2) AS temp,
ROUND(AVG(pres), 2) AS pres,
ROUND(AVG(dewp), 2) AS dewp,
ROUND(AVG(rain), 2) AS rain,
ROUND(AVG(wspm), 2) AS wspm,
b.street,
CONCAT(latitude, ',', longitude) AS coordinate
FROM
peking a
JOIN station_list b ON a.id = b.station_id
GROUP BY
CONCAT_WS(
'-',
`year`,
LPAD(`month`, 2, 0),
LPAD(`day`, 2, 0)
),
id
数据导入
搭建成功后,将mysql中的原有数据导入elasticsearch。
采用编写spring项目的方式,调用elasticsearch的api进行逐条导入。
package com.example.demo.bean;
import javax.persistence.Id;
public class pekingweather {
//注意这里务必使用包装类,使用原生double会导致数据导入为零
@Id
private String no;
private String date;
private String id;
private Double co;
private Double pm25;
private Double so2;
private Double no2;
private Double pm10;
private Double o3;
private Double temp;
private Double pres;
private Double dewp;
private Double rain;
private Double wspm;
//使用getter and setter
@Test
public void inputWeather() throws IOException {
// 封装成java对象
List<pekingweather> weatherInfos = pekingweatherMapper.selectAll();
System.out.print(weatherInfos.size());
// 打开es链接
// 封装es命令对象
for (pekingweather weatherInfo : weatherInfos) {
Index index = new Index.Builder(weatherInfo).index("peking_new").type("weather").id(weatherInfo.getNo()).build();
// 导入es数据
jestClient.execute(index);//es的命令
}
}
其他的spring东西就不写了,都是框架的东西。坐标位置需要手动修改成geo_point类型,默认读取为string。而elasticsearch索引不能直接修改字段类型,需要建立中间索引来实现字段类型修改。
首先建立一个正确的索引格式
PUT peking
{
"mappings": {
"weather": {
"properties": {
"co": {
"type": "float"
},
"coordinate": {
"type": "geo_point"
},
"date": {
"type": "date"
},
"dewp": {
"type": "float"
},
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"no": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"no2": {
"type": "float"
},
"o3": {
"type": "float"
},
"pm10": {
"type": "float"
},
"pm25": {
"type": "float"
},
"pres": {
"type": "float"
},
"rain": {
"type": "float"
},
"so2": {
"type": "float"
},
"street": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"temp": {
"type": "float"
},
"wspm": {
"type": "float"
}
}
}
}
}
然后将数据导入到新建的索引
POST _reindex
{
"source": {
"index": "peking_new"
},
"dest": {
"index": "peking"
}
}
事后再删除peking_new索引。
数据访问
先查询一下看看数据有无错误。
GET peking/weather/_search
大体确定没有错误以后,进入ManagerMent->Index Patterns
搜索框中输入你的索引名,然后第二步是选择日期列,默认日期格式yyyy-MM-dd的字符串。
图表生成
进入Visualize->Create a visualization,以折线图(Line)为例。
Y轴选择Max-co和Max-no2两种
x轴选择Date Histogram,选择date类型,右上角可以选择时间窗。

