Advertisement

2020年美国新冠肺炎疫情数据分析

阅读量:

2020年美国新冠肺炎疫情数据分析

1

3

5

1

本研究基于2020年美国新冠肺炎疫情数据构建了数据集,并采用Python语言进行开发。通过Spark框架完成数据分析工作,并实现了数据分析结果的可视化展示。

一、实验环境

(1)基于Linux的操作系统中使用的是Ubuntu 16.04版本。
(2)使用的是Hadoop平台的版本号为2.x系列中的具体版本。
(3)基于Python语言的版本为3.x系列中的具体版本。
(4)利用Spark框架运行的是版本号为 Spark 2.x系列中的具体版本。
(5)支持的Jupyter Notebook平台。

二、数据集

2.1 数据集下载

字段名称 字段含义 例子
date 日期 从YYYY年MM月DD日开始
county 区县 Snohomish County
state 州 Washington State
cases 确诊病例截至某日期在特定区县内的总数量 某区某日的确诊病例总数为X例
deaths 确诊死亡病例截至某日期在特定区县内的总数量 某区某日的确诊死亡病例总数为Y例

在这里插入图片描述

2.2 格式转换
原始数据集采用.csv格式存储。为了便于Spark程序解析并生成RDD或DataFrame结构, 首先将原始数据集转换为txt格式, 并命名为us-counties.txt。该转换过程通过Python编程语言实现, 具体编码工作场所为Jupyter Notebook环境, 详细编码内容已包含于toTxt.ipynb文件中。具体编码如下:

复制代码
    import pandas as pd
    
    #.csv ->.txt
    
    data = pd.read_csv('/home/hadoop/us-counties.csv')
    with open('/home/hadoop/us-counties.txt','a+',encoding = 'utf-8') as f:
    	for line in data.values:
    		f.write((str(line[0])+'\t'+str(line[1])+'\t'+str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n')

通过以下命令将本地文件系统中的" /home/hadoop/us-counties.txt "传输至HDFS存储位置" /user/hadoop(us_counties .txt ) "的具体操作步骤请参考下文

复制代码
    ./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

三、使用Spark对数据进行分析

完整代码实现

详细说明

复制代码
    from pyspark import SparkConf,SparkContext
    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from datetime import datetime
    import pyspark.sql.functions as func
     
    def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date
     
     
     
    #主程序:
    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
     
    fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
    schema = StructType(fields)
     
    rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
    rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
     
    shemaUsInfo = spark.createDataFrame(rdd1,schema)
     
    shemaUsInfo.createOrReplaceTempView("usInfo")
     
    #1.计算每日的累计确诊病例数和死亡数
    df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
     
    #列重命名
    df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
    df1.repartition(1).write.json("r1.json")                               #写入hdfs
     
    #注册为临时表供下一步使用
    df1.createOrReplaceTempView("ustotal")
     
    #2.计算每日较昨日的新增确诊病例数和死亡病例数
    df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
     
    df2.sort(df2["date"].asc()).repartition(1).write.json("r2.json")           #写入hdfs
     
    #3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
    df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
     
    df3.sort(df3["totalCases"].desc()).repartition(1).write.json("r3.json") #写入hdfs
     
    df3.createOrReplaceTempView("eachStateInfo")
     
    #4.找出美国确诊最多的10个州
    df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
    df4.repartition(1).write.json("r4.json")
     
    #5.找出美国死亡最多的10个州
    df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
    df5.repartition(1).write.json("r5.json")
     
    #6.找出美国确诊最少的10个州
    df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
    df6.repartition(1).write.json("r6.json")
     
    #7.找出美国死亡最少的10个州
    df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
    df7.repartition(1).write.json("r7.json")
     
    #8.统计截止5.19全美和各州的病死率
    df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
    df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("r8.json")

3.2 读取文件至 DataFrame

3.2 读取文件至 DataFrame

复制代码
    from pyspark import SparkConf,SparkContext
    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from datetime import datetime
    import pyspark.sql.functions as func
     
    def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date
     
     
     
    #主程序:
    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
     
    fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
    schema = StructType(fields)
     
    rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
    rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
     
    shemaUsInfo = spark.createDataFrame(rdd1,schema)
     
    shemaUsInfo.createOrReplaceTempView("usInfo")

3.3进行数据分析
本次实验主要统计以下8个指标:

统计每天美国的确诊和死亡病例总数。该方法是通过按日期字段分组后对cases和deaths字段进行汇总计算来实现的。
统计每天美国的确诊病例数和死亡病例数的变化情况。由于新增病例数量等于当前日期与前一日期之差值(即新增数=今日数-昨日数),因此可以通过自连接操作来实现这一目标。具体操作中需将t1表中的date字段与t2表中的date字段加1后相连接,并在t1表的totalCases字段中减去t2表的totalCases字段的结果来计算每日新增案例数目。
截至5月19日的数据统计显示,在全美各州中累计确诊人数最多的十个州的情况如下:通过筛选5月19日的数据集并按cases字段降序排列后取前十个州即可获得结果。
同样地,在全美各州中累计死亡人数最多的十个州的情况可以通过筛选5月19日的数据集并按deaths字段降序排列后取前十个州的方式来获得数据结果。
为了观察全美各州中确诊人数最少的情况,在筛选出5月19日数据的基础上按cases字段升序排列后再选取前十名即可得到所需信息。
同样地,在全美各州中观察死亡人数最少的情况,则需对筛选出的5月19日数据集按deaths字段升序排列后再选取前十名的数据即可完成分析。
在分析上述各项指标时既采用了基于DataFrame自带函数的操作方法也应用了Spark SQL技术来进行数据处理操作以提高效率

复制代码
     ./bin/hdfs dfs -get /user/hadoop/r1.json/*.json /home/hadoop/result/r1

其余结果文件使用的命令与上面相同,修改路径即可。

在这里插入图片描述

运行完命令之后可在目录/home/hadoop/result中看到结果文件。

四、数据可视化

配置第三方库pyecharts用于实现数据可视化。具体操作步骤如后所示:
在终端输入以下命令完成配置:
bash pip install pyecharts

复制代码
    pip install pyecharts

通过Jupyter Notebook这一工具实现数据处理过程,并将以下代码复制到该环境中运行

复制代码
    from pyecharts import options as opts
    from pyecharts.charts import Bar
    from pyecharts.charts import Line
    from pyecharts.components import Table
    from pyecharts.charts import WordCloud
    from pyecharts.charts import Pie
    from pyecharts.charts import Funnel
    from pyecharts.charts import Scatter
    from pyecharts.charts import PictorialBar
    from pyecharts.options import ComponentTitleOpts
    from pyecharts.globals import SymbolType
    import json
     
     
     
    #1.画出每日的累计确诊病例数和死亡数——>双柱状图
    def drawChart_1(index):
    root = "/home/hadoop/result/r" + str(index) 
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['cases']))
            deaths.append(int(js['deaths']))
     
    d = (
    Bar()
    .add_xaxis(date)
    .add_yaxis("累计确诊人数", cases, stack="stack1")
    .add_yaxis("累计死亡人数", deaths, stack="stack1")
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))
    .render("/home/hadoop/result/r1.html")
    )
     
     
    #2.画出每日的新增确诊病例数和死亡数——>折线图
    def drawChart_2(index):
    root = "/home/hadoop/result/r" + str(index) 
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['caseIncrease']))
            deaths.append(int(js['deathIncrease']))
     
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增确诊",
        y_axis=cases,
        markpoint_opts=opts.MarkPointOpts(
            data=[
                opts.MarkPointItem(type_="max", name="最大值")
     
            ]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[opts.MarkLineItem(type_="average", name="平均值")]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/r1.html")
    )
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增死亡",
        y_axis=deaths,
        markpoint_opts=opts.MarkPointOpts(
            data=[opts.MarkPointItem(type_="max", name="最大值")]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[
                opts.MarkLineItem(type_="average", name="平均值"),
                opts.MarkLineItem(symbol="none", x="90%", y="max"),
                opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),
            ]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/r2.html")
    )
     
     
     
     
    #3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格
    def drawChart_3(index):
    root = "/home/hadoop/result/r" + str(index) 
    allState = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row = []
            row.append(str(js['state']))
            row.append(int(js['totalCases']))
            row.append(int(js['totalDeaths']))
            row.append(float(js['deathRate']))
            allState.append(row)
     
    table = Table()
     
    headers = ["State name", "Total cases", "Total deaths", "Death rate"]
    rows = allState
    table.add(headers, rows)
    table.set_global_opts(
        title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")
    )
    table.render("/home/hadoop/result/r3.html")
     
     
    #4.画出美国确诊最多的10个州——>词云图
    def drawChart_4(index):
    root = "/home/hadoop/result/r" + str(index) 
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)
     
    c = (
    WordCloud()
    .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10"))
    .render("/home/hadoop/result/r4.html")
    )
     
     
     
     
    #5.画出美国死亡最多的10个州——>象柱状图
    def drawChart_5(index):
    root = "/home/hadoop/result/r" + str(index) 
    state = []
    totalDeath = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            state.insert(0,str(js['state']))
            totalDeath.insert(0,int(js['totalDeaths']))
     
    c = (
    PictorialBar()
    .add_xaxis(state)
    .add_yaxis(
        "",
        totalDeath,
        label_opts=opts.LabelOpts(is_show=False),
        symbol_size=18,
        symbol_repeat="fixed",
        symbol_offset=[0, 0],
        is_symbol_clip=True,
        symbol=SymbolType.ROUND_RECT,
    )
    .reversal_axis()
    .set_global_opts(
        title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),
        xaxis_opts=opts.AxisOpts(is_show=False),
        yaxis_opts=opts.AxisOpts(
            axistick_opts=opts.AxisTickOpts(is_show=False),
            axisline_opts=opts.AxisLineOpts(
                linestyle_opts=opts.LineStyleOpts(opacity=0)
            ),
        ),
    )
    .render("/home/hadoop/result/r5.html")
    )
     
     
     #6.找出美国确诊最少的10个州——>词云图
    def drawChart_6(index):
    root = "/home/hadoop/result/r" + str(index) 
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)
     
    c = (
    WordCloud()
    .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州"))
    .render("/home/hadoop/result/r6.html")
    )
     
     
     
     
    #7.找出美国死亡最少的10个州——>漏斗图
    def drawChart_7(index):
    root = "/home/hadoop/result/r" + str(index) 
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            data.insert(0,[str(js['state']),int(js['totalDeaths'])])
     
    c = (
    Funnel()
    .add(
        "State",
        data,
        sort_="ascending",
        label_opts=opts.LabelOpts(position="inside"),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
    .render("/home/hadoop/result/r7.html")
    )
     
     
    #8.美国的病死率--->饼状图
    def drawChart_8(index):
    root = "/home/hadoop/result/r" + str(index) 
    values = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            if str(js['state'])=="USA":
                values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
                values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
    c = (
    Pie()
    .add("", values)
    .set_colors(["blcak","orange"])
    .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
    .render("/home/hadoop/result/r8.html")
    )
     
     
    #可视化主程序:
    index = 1
    while index<9:
    funcStr = "drawChart_" + str(index)
    eval(funcStr)(index)
    index+=1

运行的时候能够在目录/home/hadoop/result中看到有.html的结果文件

在这里插入图片描述

(2)美国每日的新增确诊病例数——折线图(r2.html)

在这里插入图片描述

(3)美国每日的新增死亡病例数——折线图(r3.html)

在这里插入图片描述

(4)截止5.19,美国各州累计确诊、死亡人数和病死率——表格(r4.html)

在这里插入图片描述

(5)截止5.19,美国累计确诊人数前10的州——词云图(r5.html)

在这里插入图片描述

(6)截止5.19,美国累计死亡人数前10的州——象柱状图(r6.html)

在这里插入图片描述

(7)截止5.19,美国累计确诊人数最少的10个州——词云图(r7.html)

在这里插入图片描述

(8)截止5.19,美国累计死亡人数最少的10个州——漏斗图(r8.html)

在这里插入图片描述

(9)截止5.19,美国的病死率——饼状图(r9.html)

在这里插入图片描述

五、心得体会

在本次实验中,最为棘手的部分莫过于Jupyter Notebook的搭建与配置过程了,在完成PySpark交互环节时也遇到了不少挑战。整个过程中容易遇到的问题包括文件丢失的情况,在运行代码前建议先确保Hadoop服务已启动。如果系统无法正常连接到远程服务,则会因连接拒绝而出现问题。希望这篇文章能为您提供一些参考与帮助。

全部评论 (0)

还没有任何评论哟~