使用python和spark对2020年美国新冠肺炎疫情数据分析
主要运用Python以及Spark进行该2020年美国新冠肺炎疫情的数据分析
第一部分
第一部分
- 三、通过Spark框架实现数据处理与分析
- 四、从HDFS中提取处理后的数据并将其保存到本地存储路径
- 五、采用可视化技术展示数据特征与分析结果
- 五、归纳总结主要分析结论并提炼关键发现
- 六、参考材料
本案例以美国2020年新馆疫情数据为基础构成数据集的基础,并且该数据集的数据来源截至2020年5月19日;采用Python语言通过Spark技术进行数据分析,并在Hadoop平台环境中运行数据分析任务;以Jupyter Notebook作为开发环境进行开发操作。
一、实验平台搭建
- 安装Linux Ubuntu Kylin 16.04(因数据集较大,建议内存为3G)
- 安装Hadoop 3.1.3 安装教程
- Python 3.5(已自带Python 3.5)
- 安装Spark 2.4.0 安装教程
- 配置Python与Spark的交互 安装教程
- 安装Anaconda和Jupyter Notebook 安装教程
注:环境搭建不做详细介绍,按照上述顺序,从头搭建,基本就可以完成,上面有附带相关教程链接。
二、数据集
1.数据集来源
该数据集源自数据网站Kaggle上的美国新冠肺炎疫情相关数据集合。该集合按照data-table us-counties.csv的形式组织,并包含以下字段及其相关信息:

2.格式转换
将us-counties.csv文件复制到Ubuntu系统中的/usr/local/hadoop/data目录下,并在该目录内将CSV文件转换为TXT文件。

首先,在一个已经配置好的环境中,定位到~/jupyternotebook目录,并在此目录中打开Jupyter Notebook。


创建一个新的文件,在Jupyter Notebook环境中,并将其命名为toTxt,并将以下代码保存到该文件中。
import pandas as pd
#.csv->.txt
data = pd.read_csv('/usr/local/hadoop/data/us-counties.csv')
with open('/usr/local/hadoop/data/us-counties.txt','a+',encoding='utf-8') as f:
for line in data.values:
f.write((str(line[0])+'\t'+str(line[1])+'\t'
+str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

(4)运行完后,查看结果。

3.将文件上传至HDFS文件系统中
(1)启动Hadoop,并查看启动结果
./sbin/start-dfs.sh

在HDFS文件系统内建立一个名为/user/hadoop的文件夹,在此位置上后续产生的运行结果文件将被存储。
./bin/hdfs dfs -mkdir -p /user/hadoop
./bin/hdfs dfs -put /usr/local/hadoop/data/us-counties.txt /user/hadoop



三、使用Spark对数据进行分析
在Jupyter Notebook环境中创建一个新的文件并将其命名为 analist 以便存储所需的代码内容。请按照以下步骤将这些代码依次编写进该文件中:第一步 首先导入所需第三方库 并通过读取相关文件生成一个DataFrame 以便于后续的数据分析 这种方法不仅能够使程序更加高效而且还能实现从本地存储到分布式存储(如HDFS)的有效转换 以便后续的操作更加便捷 第二步 使用rdd操作来处理数据 这种方法不仅能够提高数据处理效率还能使程序运行更加稳定 这样一来整个流程将会变得更加顺畅
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func
def toDate(inputStr):
newStr = ""
if len(inputStr) == 8:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7]
newStr = s1+"-"+"0"+s2+"-"+"0"+s3
else:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7:]
newStr = s1+"-"+"0"+s2+"-"+s3
date = datetime.strptime(newStr, "%Y-%m-%d")
return date
#主程序:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)
rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
shemaUsInfo = spark.createDataFrame(rdd1,schema)
shemaUsInfo.createOrReplaceTempView("usInfo")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕


2.记录美国每天的累计确诊病例数和死亡病例数的做法是基于日期字段进行分组,并对确诊病例数(cases)和死亡病例数(deaths)进行汇总计算。
#1.计算每日的累计确诊病例数和死亡数
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
#列重命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result1.json") #写入hdfs
#注册为临时表供下一步使用
df1.createOrReplaceTempView("ustotal")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

3.收集美国每日新增的确诊病例和死亡数据。由于每天的确诊增量等于当前日期的数据减去前一天的数据,在进行数据处理时可以采用自连接技术。具体而言,在查询设计中将通过比较t1.totalCases与t2.totalCases的差值来确定当日增量。
#2.计算每日较昨日的新增确诊病例数和死亡病例数
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json") #写入hdfs
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

4.收集截止5.19日的数据,美国各州的累计确诊人数和死亡人数。首先提取5.19日的数据集,然后按州分组cases和deaths字段进行汇总统计。
#3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs
df3.createOrReplaceTempView("eachStateInfo")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

统计截至5月19日时,在美国确诊人数最多且人口较多的十个州中进行统计;对3)的结果DF创建临时表后,并按照确诊人数从高到低排序,并筛选出人口较多的前十名州进行分析
#4.找出美国确诊最多的10个州
df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

6.统计截至5月19日美国地区的死亡人数排行前十,请查询3)的结果数据框,并创建一个临时数据库表用于存储这些信息;然后按照死亡人数由高到低排序,并筛选出排名前十的州
#5.找出美国死亡最多的10个州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

7.根据截止至5月19日的数据统计,在美国确诊人数最少的前十名州中
#6.找出美国确诊最少的10个州
df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

8.统计截至5月19日(五点一九),美国各州中死亡人数最少的十个州是什么?首先需要根据结果DataFrame创建一个临时存储表(也称为工作表),然后按照死亡人数由低到高排序,并筛选出死亡人数最少的前十州。
#7.找出美国死亡最少的10个州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

- 计算截至5月19日的美国及其各州的病亡率数据。通过预设公式(死亡人数除以确诊病例数),对步骤3所得出的DataFrame结果创建临时存储表,并进行计算处理。
#8.统计截止5.19全美和各州的病死率
df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

10.在终端查看文件是否都存在
./bin/hdfs dfs -ls /user/hadoop

四、将结果从HDFS下载至本地文件系统
将本地文件系统中的结果目录设置于/hadoop目录下,并生成8个子目录命名为result1至result8
cd ~/result
mkdir result
cd result
mkdir result1 result2 result3 result4 result5 result6 result7 result8

2.将HDFS文件系统中的文件下载至本地文件系统。
./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1/
./bin/hdfs dfs -get /user/hadoop/result2.json/*.json /home/hadoop/result/result2/
# 对于其他文件以此类推,将文件路径修改即可


3.查看下载好的文件

五、数据可视化
1.安装第三方可视化工具pyecharts
pip install pyecharts

- 在Jupyter Notebook环境中生成名为showdata的文件,并将其代码分布部分保存至该文件中。同时将其对应的JSON数据转化为HTML格式展示。
(1)导入所需外置库
from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(2)出每日的累计确诊病例数和死亡数——>双柱状图
#1.画出每日的累计确诊病例数和死亡数——>双柱状图
def drawChart_1(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['cases']))
deaths.append(int(js['deaths']))
d = (
Bar()
.add_xaxis(date)
.add_yaxis("累计确诊人数", cases, stack="stack1")
.add_yaxis("累计死亡人数", deaths, stack="stack1")
.set_series_opts(label_opts=opts.LabelOpts(is_show=False))
.set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))
.render("/home/hadoop/result/result1/result1.html")
)
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(3)画出每日的新增确诊病例数和死亡数——>折线图
#2.画出每日的新增确诊病例数和死亡数——>折线图
def drawChart_2(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['caseIncrease']))
deaths.append(int(js['deathIncrease']))
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增确诊",
y_axis=cases,
markpoint_opts=opts.MarkPointOpts(
data=[
opts.MarkPointItem(type_="max", name="最大值")
]
),
markline_opts=opts.MarkLineOpts(
data=[opts.MarkLineItem(type_="average", name="平均值")]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("/home/hadoop/result/result2/result1.html")
)
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增死亡",
y_axis=deaths,
markpoint_opts=opts.MarkPointOpts(
data=[opts.MarkPointItem(type_="max", name="最大值")]
),
markline_opts=opts.MarkLineOpts(
data=[
opts.MarkLineItem(type_="average", name="平均值"),
opts.MarkLineItem(symbol="none", x="90%", y="max"),
opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),
]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("/home/hadoop/result/result2/result2.html")
)
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(4)画出截止5.19,美国各州累计确诊、死亡人数和病死率—>表格
#3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格
def drawChart_3(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
allState = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
row = []
row.append(str(js['state']))
row.append(int(js['totalCases']))
row.append(int(js['totalDeaths']))
row.append(float(js['deathRate']))
allState.append(row)
table = Table()
headers = ["State name", "Total cases", "Total deaths", "Death rate"]
rows = allState
table.add(headers, rows)
table.set_global_opts(
title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")
)
table.render("/home/hadoop/result/result3/result1.html")
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(5)画出美国确诊最多的10个州——>词云图
#4.画出美国确诊最多的10个州——>词云图
def drawChart_4(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
row=(str(js['state']),int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10"))
.render("/home/hadoop/result/result4/result1.html")
)
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(6)画出美国死亡最多的10个州——>象柱状图
def drawChart_5(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
state = []
totalDeath = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
state.insert(0,str(js['state']))
totalDeath.insert(0,int(js['totalDeaths']))
c = (
PictorialBar()
.add_xaxis(state)
.add_yaxis(
"",
totalDeath,
label_opts=opts.LabelOpts(is_show=False),
symbol_size=18,
symbol_repeat="fixed",
symbol_offset=[0, 0],
is_symbol_clip=True,
symbol=SymbolType.ROUND_RECT,
)
.reversal_axis()
.set_global_opts(
title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),
xaxis_opts=opts.AxisOpts(is_show=False),
yaxis_opts=opts.AxisOpts(
axistick_opts=opts.AxisTickOpts(is_show=False),
axisline_opts=opts.AxisLineOpts(
linestyle_opts=opts.LineStyleOpts(opacity=0)
),
),
)
.render("/home/hadoop/result/result5/result1.html")
)
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(7)找出美国确诊最少的10个州——>词云图
#6.找出美国确诊最少的10个州——>词云图
def drawChart_6(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
row=(str(js['state']),int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州"))
.render("/home/hadoop/result/result6/result1.html")
)
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(8)找出美国死亡最少的10个州——>漏斗图
#7.找出美国死亡最少的10个州——>漏斗图
def drawChart_7(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
data.insert(0,[str(js['state']),int(js['totalDeaths'])])
c = (
Funnel()
.add(
"State",
data,
sort_="ascending",
label_opts=opts.LabelOpts(position="inside"),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""))
.render("/home/hadoop/result/result7/result1.html")
)
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(9)美国的病死率—>饼状图
#8.美国的病死率--->饼状图
def drawChart_8(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
values = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
if str(js['state'])=="USA":
values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
c = (
Pie()
.add("", values)
.set_colors(["blcak","orange"])
.set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
.render("/home/hadoop/result/result8/result1.html")
)
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

(10)可视化主程序
#可视化主程序:
index = 1
while index<9:
funcStr = "drawChart_" + str(index)
eval(funcStr)(index)
index+=1
print('success') # 对程序是否完成进行判断,方便看出程序是否执行完毕

- 分析相关问题所生成的HTML图表
(1)美国每天新增病例数量与死亡病例数量对比——使用双柱状图展示

(2)美国每日的新增确诊病例数——>折线图

(3)美国每日的新增死亡病例数——>折线图

(4)截止5.19,美国各州累计确诊、死亡人数和病死率—>表格




(5)截止5.19,美国累计确诊人数前10的州—>词云图

(6)截止5.19,美国累计死亡人数前10的州—>象柱状图

(7)截止5.19,美国累计确诊人数最少的10个州—>词云图

(8)截止5.19,美国累计死亡人数最少的10个州—>漏斗图

(9)截止5.19,美国的病死率—>饼状图

五、总结
在本次实验中实现了我们这一学期所学内容的融会贯通,在前期阶段我们加强了Python语言的基础巩固工作,并对之前的知识点进行了系统性的复习与强化记忆。中期阶段我们重点学习并掌握了Spark的相关知识,并将其与Python语言相结合运用到实际操作中,在实操过程中运用Pyspark对TXT文件进行了数据处理,并基于DataFrame对数据进行了深入分析与挖掘工作。具体而言,在实验过程中我们采用了RDD这种分布式计算模型来对TXT文件进行处理,并基于DataFrame对数据进行了多维度分析与挖掘工作。通过这些数据分析工作生成的JSON文件即为我们最终所需的结果数据。随后我们又对生成的JSON文件进行了可视化处理,在这个过程中我们首次尝试并使用了ECharts这个可视化第三方库工具(注:ECharts即为Pyecharts的一个中文版本工具)。由于这是我在本次实验中首次接触并尝试使用的工具,在操作过程中尚不熟练掌握其中的一些基本用法与技巧,在今后的学习中还需进一步提升和完善自己。此外在这个实验过程中遇到了一些挑战性问题:例如在系统环境中运行软件时出现了安装问题导致必须重新安装系统;另外在执行Python脚本运行操作时出现代码报错现象经检查发现是配置文件配置错误所致;最终经过详细分析与调整最终找到了问题根源并解决了这些问题所在从而顺利完成整个实验任务过程
六、参考材料
[1] http://dblab.xmu.edu.cn/blog/2636-2/
注:本篇文章是基于林子雨老师博客的文章,经本人实操后发表。
