针对新冠肺炎疫情的Python疫情数据爬取(基于requests和pandas)
数据源选择
新闻媒体的报道渠道可作为数据来源之一,并以网易疫情防控报道平台为例进行说明(如下图界面所示)。其数据内容极为丰富且具有多样性:不仅包含国内疫情相关信息的数据信息,并且也涵盖了国外疫情动态的数据反馈。由于该平台是行业内知名的主流媒体报道机构,在公信度方面具有较高的可信度保证。因此我们决定采用网易疫情防控实时动态报道平台作为本研究的数据来源(见下文链接):https://wp.m.163.com/163/page/news/virus_report/index.html?_nw_ =1&anw =1
基于网易实时播报平台的数据获取过程中,由于该平台作为实时更新的网络型平台,在'Network'标签中通常能找到所需的数据.例如,在使用Chrome浏览器时演示了如何进行数据查找的过程.
- 访问网易提供的实时疫情信息更新平台
- 通过右键点击页面任意位置执行网络检查操作
- 切换至Network标签下的XHR设置界面后,在此界面中通常会看到刷新提示窗口。通过按下键盘上的'Ctrl+R'快捷键即可完成刷新操作。
- 在网页检查结果界面的左下角突出显示区域中, 我们找到了目标数据源所在的位置, 通过该工具能够自动采集并获取对应地址的信息内容。
- 我们的最终目标是实现对全国范围内的实时数据以及各国和各省份的历史数据进行获取。

初步探索
经过比对发现,在第二个目标地址中存储了疫情相关数据信息。因此我们首先对该地址进行数据抓取操作。接着定位到该地址的具体位置信息,并点击对应的headers信息后查看,在url路径中的问号之后部分即代表参数配置项,默认情况下无需手动设置即可获取数据。因此我们需要请求的目标地址为:https://c.m.163.com/ug/api/wuhan/app/data/list-total,并且可以看到该接口采用的是get方法获取资源。为了确保请求的安全性,请访问浏览器的User-Agent属性,并在使用requests库发送请求时,请设置headers头中的User-Agent字段以模仿真实用户身份信息进行访问

随后发起网络请求:导入所需的Python库(如os, requests, pandas等),通过requests库发送网络请求,并借助pandas模块组织数据存储。设置相应的HTTP头部信息以模仿浏览器行为,并执行HTTP GET/POST请求;同时监控响应状态信息以评估连接是否成功
import requests
import pandas as pd
import time
pd.set_option('max_rows',500)
url = "https://c.m.163.com/ug/api/wuhan/app/data/list-total" # 定义要访问的地址
r = requests.get(url,headers={'user-agent': '填写自己的user-agent'}) # 使用requests发起请求
print(r.status_code) # 查看请求状态
print(type(r.text))
print(len(r.text))
可以看到返回后的内容是一个几十万长度的字符串,并且在网页预览中观察到数据呈现为类似字典的json格式结构。因此我们将其转换为json格式存储以便后续分析
可以看到返回的结果是一个长度约为几十万的字符串
import json
data_json = json.loads(r.text)
type(data_json)
data_json.keys()
可以看出在data中存放着我们需要的数据,因此我们取出数据。
data = data_json['data'] # 取出json中的数据
data.keys()
数据中总共有四个键,每个键存储着不同的内容:

接下来分别获取实时数据和历史数据。
直接上代码了,具体不懂可以评论讨论。
import requests
import pandas as pd
import time
import json
# 将提取数据的方法封装为函数
def get_data(data, info_list):
info = pd.DataFrame(data)[info_list] # 主要信息
today_data = pd.DataFrame([i['today'] for i in data]) # 提取today的数据
today_data.______ = ['today_' + i for i in today_data.columns] # 修改列名 columns
total_data = pd.DataFrame([i['total'] for i in data]) # 提取total的数据
total_data.______ = ['total_' + i for i in total_data.columns] # 修改列名 columns
return pd.concat([info, total_data, today_data], axis=1) # info、today和total横向合并最终得到汇总的数据
def save_data(data,name): # 定义保存数据方法
file_name = name+'_'+time.strftime('%Y_%m_%d',time.localtime(time.time()))+'.csv'
data.to_csv(file_name,index=None,encoding='utf_8_sig')
print(file_name+' 保存成功!')
def main():
pd.set_option('max_rows',500)
url = "https://c.m.163.com/ug/api/wuhan/app/data/list-total"
headers = {'user-agent': '这里随便写入一个user-agent'}
r = requests.get(url,headers=headers)
# print(r.status_code)
# print(type(r.text))
# print(len(r.text))
data_json = json.loads(r.text)
type(data_json)
data_json.keys()
data = data_json['data'] # 取出json中的数据
# 中国各省的实时数据爬取
data_province = data['areaTree'][2]['children']
today_province = get_data(data_province,['id','lastUpdateTime','name'])
save_data(today_province, 'today_province')
# 世界各国实时数据爬取
areaTree = data['areaTree'] # 取出areaTree
today_world = get_data(areaTree, ['id', 'lastUpdateTime', 'name'])
today_world.head()
# 中国历史数据爬取
chinaDayList = data['chinaDayList']
alltime_China = get_data(chinaDayList, ['date', 'lastUpdateTime'])
save_data(alltime_China, 'alltime_China')
# 中国各省历史数据爬取
province_dict = {num:name for num,name in zip(today_province['id'],today_province['name'])}
start = time.time()
for province_id in province_dict: # 遍历各省编号
try:
# 按照省编号访问每个省的数据地址,并获取json数据
url_1 = 'https://c.m.163.com/ug/api/wuhan/app/data/list-by-area-code?areaCode=' + province_id
r = requests.get(url_1, headers=headers)
data_json = json.loads(r.text)
# 提取各省数据,然后写入各省名称
province_data = get_data(data_json['data']['list'], ['date'])
province_data['name'] = province_dict[province_id]
# 合并数据
if province_id == '420000':
alltime_province = province_data
else:
alltime_province = pd.DataFrame([alltime_province, province_data])
print('-' * 20, province_dict[province_id], '成功',
province_data.shape, alltime_province.shape,
',累计耗时:', round(time.time() - start), '-' * 20)
# 设置延迟等待
time.sleep(10)
except:
print('-' * 20, province_dict[province_id], 'wrong', '-' * 20)
alltime_province.info()
save_data(alltime_province, 'alltime_province')
# 世界各国历史数据爬取
country_dict = {key: value for key, value in zip(today_world['id'], today_world['name'])}
start = time.time()
for country_id in country_dict: # 遍历每个国家的编号
try:
# 按照编号访问每个国家的数据地址,并获取json数据
url_2 = 'https://c.m.163.com/ug/api/wuhan/app/data/list-by-area-code?areaCode=' + country_id
r = requests.get(url_2, headers=headers)
json_data = json.loads(r.text)
# 生成每个国家的数据
country_data = get_data(json_data['data']['list'], ['date'])
country_data['name'] = country_dict[country_id]
# 数据叠加
if country_id == '9577772':
alltime_world = country_data
else:
alltime_world = pd.DataFrame([alltime_world, country_data])
print('-' * 20, country_dict[country_id], '成功', country_data.shape, alltime_world.shape,
',累计耗时:', round(time.time() - start), '-' * 20)
time.sleep(10)
except:
print('-' * 20, country_dict[country_id], 'wrong', '-' * 20)
print(alltime_world.shape)
save_data(alltime_world, 'alltime_world')
main()
声明:本博客内容为学习酷客上相关案例后所做总结。
