2020年美国新冠肺炎疫情数据分析
2020年美国新冠肺炎疫情数据分析
-
一、实验平台
- 二、数据集
-
-
- 数据集概述
-
-
- 格式转换操作
-
- 通过HDFS文件系统将文件上传至目标位置
-
第三章 利用Spark开展数据分析工作
-
第一部分 完整的代码实现
- 第一步 配置数据读取路径并生成DataFrame对象
- 第二步 建立数据存储机制以支持后续分析需求
- 第三步 执行一系列数据分析操作以提取关键信息
- 第四步 确定最终的结果输出位置及格式要求
-
四、数据可视化
-
- 1.可视化工具选择与代码
- 2.结果图表展示
-
五、参考资料
-
本案例基于2020年美国新冠肺炎疫情数据构建了数据集,并采用Python作为编程工具。运用Spark框架对数据展开分析,并并对分析结果进行了直观展示。
一、实验环境
- Linux: Ubuntu 16.04
- Hadoop:3.1.3
- Python: 3.5
- Spark: 2.4.0
- Jupyter Notebook
二、数据集
1.数据集介绍
本次作业所使用的 数据 集源自于 Kaggle 平台 提供 的 美国新冠肺炎疫情相关 数据 集合 , 该 数据 集按照 data 文件 us-counties.csv 的 形式 进行 组织 , 其中 记录 了从 美国 确诊 初探 病例 至 2020 年 5 月 19 日 为止 的 相关 信息 。这些记录涉及以下字段:
| 字段名称 | 字段含义 | 例子 |
|---|---|---|
| date | 日期 | 2020/1/21;2020/1/22 |
| county | 区县(州的下一级单位) | Snohomish |
| state | 州 | Washington |
| cases | 截止该日期该区县的累计确诊人数 | 1,2,3… |
| deaths | 截止该日期该区县的累计死亡人数 | 1,2,3… |

2.格式转换
原始数据集采用.csv格式组织结构。为了便于spark系统解析生成RDD或DataFrame对象的需求, 首先通过python编程语言实现数据转码过程, 将原始数据集从.csv格式转为.txt格式, 生成us-counties.txt文件存放存储。具体的python脚本代码已包含在toTxt.py文件中, 具体实现细节请参考附录部分。
import pandas as pd
#.csv->.txt
data = pd.read_csv('/home/hadoop/us-counties.csv')
with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:
for line in data.values:
f.write((str(line[0])+'\t'+str(line[1])+'\t'
+str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))
首先,请将us-counties.csv数据集放置于Ubuntu系统中的/usr/local/hadoop这一特定路径中,并完成相应的数据格式转换。随后,在该特定路径中进行进一步的数据处理工作以生成所需的分析结果。

- 使用jupyter notebook运行
toTxt.py。

- 查看运行结果,/usr/local/hadoop下已生成us-counties.txt文件

3.将文件上传至HDFS文件系统中
- 启动Hadoop,并使用jps命令查看启动结果
./sbin/start-dfs.sh

位于HDFS文件系统内,在用户目录下的hadoop目录中进行数据处理时,默认会生成一个空的空目录结构,并且后续产生的运行结果数据将被存储在此处。
./bin/hdfs dfs -mkdir -p /user/hadoop
./bin/hdfs dfs -put /usr/local/hadoop/data/us-counties.txt /user/hadoop

三、使用Spark对数据进行分析
这里采用python作为编程语言。
1.完整代码
本部分操作的完整实验代码存放在了analyst.py中,具体如下
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func
def toDate(inputStr):
newStr = ""
if len(inputStr) == 8:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7]
newStr = s1+"-"+"0"+s2+"-"+"0"+s3
else:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7:]
newStr = s1+"-"+"0"+s2+"-"+s3
date = datetime.strptime(newStr, "%Y-%m-%d")
return date
#主程序:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)
rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
shemaUsInfo = spark.createDataFrame(rdd1,schema)
shemaUsInfo.createOrReplaceTempView("usInfo")
#1.计算每日的累计确诊病例数和死亡数
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
#列重命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result1.json") #写入hdfs
#注册为临时表供下一步使用
df1.createOrReplaceTempView("ustotal")
#2.计算每日较昨日的新增确诊病例数和死亡病例数
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json") #写入hdfs
#3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs
df3.createOrReplaceTempView("eachStateInfo")
#4.找出美国确诊最多的10个州
df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")
#5.找出美国死亡最多的10个州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")
#6.找出美国确诊最少的10个州
df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")
#7.找出美国死亡最少的10个州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")
#8.统计截止5.19全美和各州的病死率
df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")
2.读取文件生成DataFrame
上面已经提供了完整的代码。
我们对这部分代码进行了简要介绍。
首先说明了如何通过spark读取源文件生成DataFrame以实现后续的数据分析工作更为便捷。
此外需要注意的是 在使用Jupyter Notebook进行PySpark程序调试时 有些运行后的结果无法直接在网页界面中查看 可以通过print(‘success!’)语句执行后会显示成功信息。
本部分的具体实现功能则存储在analyst.py文件中
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func
def toDate(inputStr):
newStr = ""
if len(inputStr) == 8:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7]
newStr = s1+"-"+"0"+s2+"-"+"0"+s3
else:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7:]
newStr = s1+"-"+"0"+s2+"-"+s3
date = datetime.strptime(newStr, "%Y-%m-%d")
return date
#主程序:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)
rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
shemaUsInfo = spark.createDataFrame(rdd1,schema)
shemaUsInfo.createOrReplaceTempView("usInfo")
将以上代码在jupyter notebook中运行,即可生成DataFrame,如下图所示。


3.进行数据分析
本实验主要统计以下8个指标,分别是:
计算美国每日截止时的累计确诊病例数及死亡人数的方法是以日期字段为分组依据,并对cases和deaths字段进行聚合计算
#1.计算每日的累计确诊病例数和死亡数
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
#列重命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result1.json") #写入hdfs
#注册为临时表供下一步使用
df1.createOrReplaceTempView("ustotal")
运行结果如下图所示:

收集美国每天的增量数据(即每日的新增确诊人数和新增死亡人数)。由于增量等于今天的数量减去昨天的数量(即增量=今日值-昨日值),因此可以考虑通过自连接技术来处理这些数据,并满足t1.date比t2.date晚一天的条件(即满足t1日期等于t2日期加一日),从而计算出当天的确诊增量和死亡增量。
#2.计算每日较昨日的新增确诊病例数和死亡病例数
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json") #写入hdfs
运行结果如下图所示:

- 统计截至5月19日为止, 美国各州的新增确诊病例总数和死亡病例总数. 首先提取5月19日的数据集, 然后按state字段进行分组处理: 分别汇总cases字段的数据, 并计算deaths字段的总值.
#3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs
df3.createOrReplaceTempView("eachStateInfo")
运行结果如下图所示:

- 统计截至5月19日为止的数据时发现,在美国各州中确诊人数最多、排位前十的十个州占比较大。对第3步所得结果中的DataFrame进行临时表注册后,在按照确诊人数降序排序的基础上选取其中排名靠前的十一个地区
#4.找出美国确诊最多的10个州
df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")
运行结果如下图所示:

统计截至5月19日时,在美国各州中死亡人数最多且州数最多的十个州共有多少?首先将步骤3的结果DataFrame创建为临时存储表,并按照死亡人数从高到低排序。接着从中筛选出排名前十的州。
#5.找出美国死亡最多的10个州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")
运行结果如下图所示:

计算截至5月19日时,在美国确诊人数最少的十个州。首先将该结果DataFrame按确诊人数进行升序排序;接着从中选取前十个州,并将其创建为一个临时存储表。
#6.找出美国确诊最少的10个州
df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")
运行结果如下图所示:

截至5月19日的数据统计显示美国死亡人数最少的前十名州。针对步骤3的结果数据框进行临时表注册后按照死亡人数从低到高排序并选取前十名州进行分析
#7.找出美国死亡最少的10个州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")
运行结果如下图所示:

统计截至5月19日为止的数据时,请注意全美及各州地区的病发率数据统计情况:计算方法是将死亡病例数量与确诊病例总数相除;随后请将第(3)步所得的DataFrame对象注册至临时存储表中,并应用上述公式进行计算
#8.统计截止5.19全美和各州的病死率
df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")
运行结果如下图所示:

4.结果文件
以Spark计算所得的Json格式存储的文件
./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1
...
除了其他结果文件外,请应用相同的命令,并只需更改路径即可。

四、数据可视化
1.可视化工具选择与代码
在本案例中,我们采用了Python第三方库pyecharts来进行数据可视化。
pip install pyecharts
安装过程如下图所示:

具体可视化实现代码组织于showdata.py文件中。具体代码如下:
from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json
#1.画出每日的累计确诊病例数和死亡数——>双柱状图
def drawChart_1(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['cases']))
deaths.append(int(js['deaths']))
d = (
Bar()
.add_xaxis(date)
.add_yaxis("累计确诊人数", cases, stack="stack1")
.add_yaxis("累计死亡人数", deaths, stack="stack1")
.set_series_opts(label_opts=opts.LabelOpts(is_show=False))
.set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))
.render("/home/hadoop/result/result1/result1.html")
)
#2.画出每日的新增确诊病例数和死亡数——>折线图
def drawChart_2(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['caseIncrease']))
deaths.append(int(js['deathIncrease']))
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增确诊",
y_axis=cases,
markpoint_opts=opts.MarkPointOpts(
data=[
opts.MarkPointItem(type_="max", name="最大值")
]
),
markline_opts=opts.MarkLineOpts(
data=[opts.MarkLineItem(type_="average", name="平均值")]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("/home/hadoop/result/result2/result1.html")
)
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增死亡",
y_axis=deaths,
markpoint_opts=opts.MarkPointOpts(
data=[opts.MarkPointItem(type_="max", name="最大值")]
),
markline_opts=opts.MarkLineOpts(
data=[
opts.MarkLineItem(type_="average", name="平均值"),
opts.MarkLineItem(symbol="none", x="90%", y="max"),
opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),
]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("/home/hadoop/result/result2/result2.html")
)
#3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格
def drawChart_3(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
allState = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
row = []
row.append(str(js['state']))
row.append(int(js['totalCases']))
row.append(int(js['totalDeaths']))
row.append(float(js['deathRate']))
allState.append(row)
table = Table()
headers = ["State name", "Total cases", "Total deaths", "Death rate"]
rows = allState
table.add(headers, rows)
table.set_global_opts(
title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")
)
table.render("/home/hadoop/result/result3/result1.html")
#4.画出美国确诊最多的10个州——>词云图
def drawChart_4(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
row=(str(js['state']),int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10"))
.render("/home/hadoop/result/result4/result1.html")
)
#5.画出美国死亡最多的10个州——>象柱状图
def drawChart_5(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
state = []
totalDeath = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
state.insert(0,str(js['state']))
totalDeath.insert(0,int(js['totalDeaths']))
c = (
PictorialBar()
.add_xaxis(state)
.add_yaxis(
"",
totalDeath,
label_opts=opts.LabelOpts(is_show=False),
symbol_size=18,
symbol_repeat="fixed",
symbol_offset=[0, 0],
is_symbol_clip=True,
symbol=SymbolType.ROUND_RECT,
)
.reversal_axis()
.set_global_opts(
title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),
xaxis_opts=opts.AxisOpts(is_show=False),
yaxis_opts=opts.AxisOpts(
axistick_opts=opts.AxisTickOpts(is_show=False),
axisline_opts=opts.AxisLineOpts(
linestyle_opts=opts.LineStyleOpts(opacity=0)
),
),
)
.render("/home/hadoop/result/result5/result1.html")
)
#6.找出美国确诊最少的10个州——>词云图
def drawChart_6(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
row=(str(js['state']),int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州"))
.render("/home/hadoop/result/result6/result1.html")
)
#7.找出美国死亡最少的10个州——>漏斗图
def drawChart_7(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
data.insert(0,[str(js['state']),int(js['totalDeaths'])])
c = (
Funnel()
.add(
"State",
data,
sort_="ascending",
label_opts=opts.LabelOpts(position="inside"),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""))
.render("/home/hadoop/result/result7/result1.html")
)
#8.美国的病死率--->饼状图
def drawChart_8(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
values = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,则终止循环
break
js = json.loads(line)
if str(js['state'])=="USA":
values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
c = (
Pie()
.add("", values)
.set_colors(["blcak","orange"])
.set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
.render("/home/hadoop/result/result8/result1.html")
)
#可视化主程序:
index = 1
while index<9:
funcStr = "drawChart_" + str(index)
eval(funcStr)(index)
index+=1
在jupyter notebook中运行showdata.py,如下图所示:








2.结果图表展示
可视化结果采用.html格式进行展示。其中result1的结果展示图文件位置为"/home/hadoop/result/result1/result1.html";result2的结果展示图文件位置为"/home/hadoop/result/result2/result1.html";其余类似情况均通过递推完成。具体示例图件如下:
- 美国每日的累计确诊病例数和死亡数——>双柱状图

- 美国每日的新增确诊病例数——>折线图

- 美国每日的新增死亡病例数——>折线图

- 截至2020.5.19,美国各州累计确诊、死亡人数和病死率—>表格

- 截至2020.5.19,美国累计确诊人数前10的州—>词云图

- 截至2020.5.19,美国累计死亡人数前10的州—>象柱状图

- 截至2020.5.19,美国累计确诊人数最少的10个州—>词云图

- 截至2020.5.19,美国累计死亡人数最少的10个州—>漏斗图

- 截至2020.5.19,美国的病死率—>饼状图

