Advertisement

2020年美国新冠肺炎疫情数据分析

阅读量:

2020年美国新冠肺炎疫情数据分析

  • 实验目的

  • 实验环境

    • 实验环境问题与解决
    • 解决方法
  • 数据集的获取与导入过程随后发送至HDFS系统中

    • 完成了数据集的下载操作
    • 开始对数据格式进行转换处理
    • 经过处理的数据文件随后被传输至HDFS存储系统中

利用Spark进行数据分析处理。
基于目标需求运用Spark处理数据。
检查HDFS分布式存储中的结果文件位置。
将分发的分析结果传输至用户的存储目录中。
在/homeradoop目录中建立专门用于存放分析结果的文件夹。
分别从HDF S获取分析数据并将其传输至用户的本地存储目录中。
将分发的数据重命名并保存为part-00000.json格式。

  • 使用pyecharts数据可视化

    • 安装pyecharts
    • 实现可视化数据处理
  • 可视化结果

实验目的

本研究旨在分析截至美东时间(北京时间)

全美及各州的每日新增确诊病例数量

实验环境

(1)Linux:Ubuntu Kylin 14.04
(2)Hadoop2.10.0
(3)Python:3.6
(4)Spark:2.4.0

实验环境问题与解决

Ubuntu Kylin 14.04系统预装了Pythons 2.7和3.4两个版本;其中Pythons 3.4版本由于兼容性问题无法安装最新的pip工具;此外基于Pythons第三方可视化库pyecharts也可能会因兼容性原因而无法正常运行

解决方法

Ubuntu 14可以通过软件源获取Python 3.6版本,并且这也是当前pip支持的最低版本。请运行以下代码以检查当前Python版本是否为3.6及更高版本:

复制代码
    python3 -V

安装Python3.6

复制代码
    sudo add-apt-repository ppa:deadsnakes/ppa
    sudo apt-get update
    sudo apt-get install python3.6
    sudo apt-get install python3.6-gdbm

Python3重新指向Python3.6

复制代码
    sudo rm /usr/bin/python3
    sudo ln -s python3.6 /usr/bin/python3

python3.6安装pip

复制代码
    wget https://bootstrap.pypa.io/pip/get-pip.py
    sudo pyhon3 get-pip.py

在安装pip时出现错误信息显示:‘/usr/lib/python3/dist-packages/pkg_resources.py’错误,请运行以下命令

复制代码
    sudo vim /usr/lib/python3/dist-packages/pkg_resources.py

将此段代码的执行逻辑进行聚焦,并对以下代码块进行注释隐藏:importlib.importlib._bootstrap as importlib_bootstrap;并在下面设置importlib_bootstrap = None。

在这里插入图片描述

保存退出,重新安装pip

数据集获取和上传到HDFS

数据集下载

本次作业使用的数据集源自Kaggle平台上的美国新冠肺炎疫情数据分析库。该数据库按照dataframe格式存储在文件名us-counties.csv中,在此期间记录了自2020年1月21日以来(截止至美国首次报告新冠肺炎确诊病例的日期)的相关信息。具体包含以下字段:

记录日期 行政区域 Snohomish;华盛顿州

在这里插入图片描述

将数据集us-counties.csv放在/home/hadoop下

数据格式转换

原始数据集采用.csv格式存储。为了便于spark解析生成RDD或DataFrame,随后将us-counties.csv转换为.txt格式文件命名为us-counties.txt。使用Python语言完成转换操作,并将相关代码实现于toTxt.py文件中。具体代码如下:

复制代码
    import pandas as pd
     
    #.csv->.txt
    data = pd.read_csv('/home/hadoop/us-counties.csv')
    with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:
    for line in data.values:
        f.write((str(line[0])+'\t'+str(line[1])+'\t'
                +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

应有Python装配pandas模块方能成功执行数据转换,并需以Python3版本运行代码;具体安装步骤如下:

复制代码
    sudo pip install pandas
    sudo pip3.6 install pandas

数据文件上传至HDFS中

启动Hadoop之后执行以下操作步骤:将本地文件系统的 /home/hadoop/us-counties.txt 传输至 HDFS 存储位置 /user/hadoop/us-counties.txt。

复制代码
    ssh localhost
    cd /usr/local/hadoop
    ./sbin/start-dfs.sh
    ./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

使用Spark对数据分析处理

通过Spark根据目标要求处理数据

本部分操作的完整实验代码存放在了analyst.py中,具体如下:

复制代码
    from pyspark import SparkConf,SparkContext
    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from datetime import datetime
    import pyspark.sql.functions as func
     
    def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date
     
     
     
    #主程序:
    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
     
    fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
    schema = StructType(fields)
     
    rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
    rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
     
     
    shemaUsInfo = spark.createDataFrame(rdd1,schema)
     
    shemaUsInfo.createOrReplaceTempView("usInfo")
     
    #1.计算每日的累计确诊病例数和死亡数
    df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
     
    #列重命名
    df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
    df1.repartition(1).write.json("result1.json")                               #写入hdfs
     
    #注册为临时表供下一步使用
    df1.createOrReplaceTempView("ustotal")
     
    #2.计算每日较昨日的新增确诊病例数和死亡病例数
    df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
     
    df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #写入hdfs
     
    #3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
    df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
     
    df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs
     
    df3.createOrReplaceTempView("eachStateInfo")
     
    #4.找出美国确诊最多的10个州
    df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
    df4.repartition(1).write.json("result4.json")
     
    #5.找出美国死亡最多的10个州
    df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
    df5.repartition(1).write.json("result5.json")
     
    #6.找出美国确诊最少的10个州
    df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
    df6.repartition(1).write.json("result6.json")
     
    #7.找出美国死亡最少的10个州
    df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
    df7.repartition(1).write.json("result7.json")
     
    #8.统计截止5.19全美和各州的病死率
    df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
    df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

用python3执行analyst.py

确认HDFS分布式文件系统中的结果文件

复制代码
    cd /usr/local/hadoop
    ./bin/hdfs dfs -ls

将HDFS分布式文件系统的结果下载到用户目录下

这些Spark计算的结果被存储为.json格式的文件,并非是为了展示而是为了便于后续进行可视化处理。与此同时,在发现通过Python读取HDFS文件系统较为不便的情况下,则选择将HDFS上的结果文件复制至本地文件系统中进行操作。

在/home/hadoop下创建用于存储结果文件的文件

复制代码
    cd /home/hadoop
    mkdir result
    cd result
    mkdir result1
    mkdir result2
    mkdir result3
    mkdir result4
    mkdir result5
    mkdir result6
    mkdir result7
    mkdir result8

分别将HDFS下的分析结果转储到用户目录下

复制代码
    cd /usr/local/hadoop
    ./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1

对于result2等结果文件,使用相同命令,只需要改一下路径即可

将分析结果更名为part-00000.json

复制代码
    cd /home/hadoop/result/result1
    mv part-00000*.json part-00000.json
在这里插入图片描述

使用pyecharts数据可视化

安装pyecharts

复制代码
    pip install pyecharts
    pip3.6 install pyecharts

实现可视化数据处理

可视化实现代码组织与showdata.py文件中。具体代码如下:

复制代码
    from pyecharts import options as opts
    from pyecharts.charts import Bar
    from pyecharts.charts import Line
    from pyecharts.components import Table
    from pyecharts.charts import WordCloud
    from pyecharts.charts import Pie
    from pyecharts.charts import Funnel
    from pyecharts.charts import Scatter
    from pyecharts.charts import PictorialBar
    from pyecharts.options import ComponentTitleOpts
    from pyecharts.globals import SymbolType
    import json
     
     
     
    #1.画出每日的累计确诊病例数和死亡数——>双柱状图
    def drawChart_1(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['cases']))
            deaths.append(int(js['deaths']))
     
    d = (
    Bar()
    .add_xaxis(date)
    .add_yaxis("累计确诊人数", cases, stack="stack1")
    .add_yaxis("累计死亡人数", deaths, stack="stack1")
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))
    .render("/home/hadoop/result/result1/result1.html")
    )
     
     
    #2.画出每日的新增确诊病例数和死亡数——>折线图
    def drawChart_2(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['caseIncrease']))
            deaths.append(int(js['deathIncrease']))
     
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增确诊",
        y_axis=cases,
        markpoint_opts=opts.MarkPointOpts(
            data=[
                opts.MarkPointItem(type_="max", name="最大值")
     
            ]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[opts.MarkLineItem(type_="average", name="平均值")]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result1.html")
    )
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增死亡",
        y_axis=deaths,
        markpoint_opts=opts.MarkPointOpts(
            data=[opts.MarkPointItem(type_="max", name="最大值")]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[
                opts.MarkLineItem(type_="average", name="平均值"),
                opts.MarkLineItem(symbol="none", x="90%", y="max"),
                opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),
            ]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result2.html")
    )
     
     
     
     
    #3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格
    def drawChart_3(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    allState = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row = []
            row.append(str(js['state']))
            row.append(int(js['totalCases']))
            row.append(int(js['totalDeaths']))
            row.append(float(js['deathRate']))
            allState.append(row)
     
    table = Table()
     
    headers = ["State name", "Total cases", "Total deaths", "Death rate"]
    rows = allState
    table.add(headers, rows)
    table.set_global_opts(
        title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")
    )
    table.render("/home/hadoop/result/result3/result1.html")
     
     
    #4.画出美国确诊最多的10个州——>词云图
    def drawChart_4(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)
     
    c = (
    WordCloud()
    .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10"))
    .render("/home/hadoop/result/result4/result1.html")
    )
     
     
     
     
    #5.画出美国死亡最多的10个州——>象柱状图
    def drawChart_5(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    state = []
    totalDeath = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            state.insert(0,str(js['state']))
            totalDeath.insert(0,int(js['totalDeaths']))
     
    c = (
    PictorialBar()
    .add_xaxis(state)
    .add_yaxis(
        "",
        totalDeath,
        label_opts=opts.LabelOpts(is_show=False),
        symbol_size=18,
        symbol_repeat="fixed",
        symbol_offset=[0, 0],
        is_symbol_clip=True,
        symbol=SymbolType.ROUND_RECT,
    )
    .reversal_axis()
    .set_global_opts(
        title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),
        xaxis_opts=opts.AxisOpts(is_show=False),
        yaxis_opts=opts.AxisOpts(
            axistick_opts=opts.AxisTickOpts(is_show=False),
            axisline_opts=opts.AxisLineOpts(
                linestyle_opts=opts.LineStyleOpts(opacity=0)
            ),
        ),
    )
    .render("/home/hadoop/result/result5/result1.html")
    )
     
     
     
    #6.找出美国确诊最少的10个州——>词云图
    def drawChart_6(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)
     
    c = (
    WordCloud()
    .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州"))
    .render("/home/hadoop/result/result6/result1.html")
    )
     
     
     
     
    #7.找出美国死亡最少的10个州——>漏斗图
    def drawChart_7(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            data.insert(0,[str(js['state']),int(js['totalDeaths'])])
     
    c = (
    Funnel()
    .add(
        "State",
        data,
        sort_="ascending",
        label_opts=opts.LabelOpts(position="inside"),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
    .render("/home/hadoop/result/result7/result1.html")
    )
     
     
    #8.美国的病死率--->饼状图
    def drawChart_8(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    values = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            if str(js['state'])=="USA":
                values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
                values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
    c = (
    Pie()
    .add("", values)
    .set_colors(["blcak","orange"])
    .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
    .render("/home/hadoop/result/result8/result1.html")
    )
     
     
    #可视化主程序:
    index = 1
    while index<9:
    funcStr = "drawChart_" + str(index)
    eval(funcStr)(index)
    index+=1

使用python3执行showdata.py

可视化结果

可视化结果采用.html格式的;其中result1的结果展示图的保存路径为"/home/hadoop/result/result1result1.html";result2的结果展示图的保存路径为"/home/hadoop/resultresult2$result1.html";其余类似情况则通过递推完成;具体截图如下:

(1)美国每日的累计确诊病例数和死亡数——>双柱状图

在这里插入图片描述

(2)美国每日的新增确诊病例数——>折线图

在这里插入图片描述

(3)美国每日的新增死亡病例数——>折线图

在这里插入图片描述

(4)截止5.19,美国各州累计确诊、死亡人数和病死率—>表格

在这里插入图片描述

(5)截止5.19,美国累计确诊人数前10的州—>词云图

在这里插入图片描述

(6)截止5.19,美国累计死亡人数前10的州—>象柱状图

在这里插入图片描述

(7)截止5.19,美国累计确诊人数最少的10个州—>词云图

在这里插入图片描述

(8)截止5.19,美国累计死亡人数最少的10个州—>漏斗图

在这里插入图片描述

(9)截止5.19,美国的病死率—>饼状图

在这里插入图片描述

全部评论 (0)

还没有任何评论哟~