基于python的疫情数据分析思路,用python获取中国每天的疫情数据
本文主要介绍python清洗疫情的历史数据,包括数据采集方法和用python读取csv的详细代码。本文通过示例代码给你做了非常详细的介绍,对你的学习或工作有一定的参考价值。有需要的朋友可以参考一下。
00-1010 1.数据采集2。使用python读取csv3。使用pyhon清理数据4。自动将清理后的数据导入MySql。2020年大三的一次大训练作业中,我搭建了一个新冠肺炎疫情的数据采集和可视化分析系统,大致就是先找到数据,然后导入hive,再用hive清洗数据。然后用hql将清洗后的数据导入MySql,再用ssm开发后台数据接口,然后在前端用echarts和tables可视化数据。你可以查一下:当时的https://www.jb51.net/article/179889.htm.主要是要求用hive来处理数据,但是当时的数据是来自一个大老板的数据接口。最后用hive处理导入数据库,真的是大材小用了。所以数据处理不合适,其他数据处理和数据可视化都不错。
这次有个小伙伴也想做一个疫情数据采集可视化系统。他想向我学习我以前的做法,让我给你一些建议。那么问题来了:之前的数据比较少,可以直接从网上提供的免费接口获取。现在,疫情已经过去两年多了。如果要整理历史上各省市每天的数据,数据会比较庞大,而且几乎没有现成的满足功能的接口。因此,我做了以下工作来获取和处理数据:
目录
数据来源是我在GitHub上长期收集的这个项目:https://lab.isaaclin.cn/nCoV/.
数据链:https://github.com/BlankerL/DXY-COVID-19-Data/releases
此外,还部署了数据仓库。每天0: 00会准时执行程序,推送数据发布。
我们可以直接从达闼的数据仓库下载现成爬虫爬取的数据,数据可以csv格式直接下载到DXYArea.csv,方便处理。
下载打开后,你会发现这个92MB的文件包含了近100W条数据。直接看会有点慢。
所以,这个时候我想到可以尝试用python的熊猫来分块读取数据。这个工具对于数据处理非常方便,可以快速读取数据。
1. 数据获取
选择使用熊猫模块读取csv,并使用原生读取非常慢的注:py脚本文件和csv文件放在同一目录下.
进口熊猫作为pd
将numpy作为np导入
读取的文件数
filePath=DXYArea.csv
#获取数据
def read_csv_feature(文件路径):
#读取文件
f=打开(文件路径,编码=utf-8 )
reader=pd.read_csv(f,sep=,,iterator=True)
循环=真
chunkSize=1000000
chunks=[]
while loop:
尝试:
chunk=reader . get _ chunk(chunk size)
chunks.append(chunk)
StopIteration:除外
循环=假
df=pd.concat(chunks,axis=0,ignore_index=True)
f.close()
返回df
数据=读取csv特征(文件路径)
打印(“数据读取成功-”)
csv数据读取成功后,都存在于数据中,这个数据就是一个数据集。
可以使用numpy模块工具过滤数据集,导出并转换成列表,方便数据操作。
country name=NP . array(data[ country name ])
country English name=NP . array(data[ country English name ])
省名=
np.array(data["provinceName"])
province_confirmedCount = np.array(data["province_confirmedCount"])
province_curedCount = np.array(data["province_curedCount"])
province_deadCount = np.array(data["province_deadCount"])
updateTime = np.array(data["updateTime"])
cityName = np.array(data["cityName"])
city_confirmedCount = np.array(data["city_confirmedCount"])
city_curedCount = np.array(data["city_curedCount"])
city_deadCount = np.array(data["city_deadCount"])
这样就把所有需要用到的数据筛选出来了。
3.使用pyhon进行数据清洗
这里的清洗我还是使用了笨方法,很直接暴力的把数据装进对应的list中:
# 全国历史数据historyed = list()
# 全国最新数据
totaled = list()
# province最新数据
provinceed = list()
# area最新数据
areaed = list()
for i in range(len(data)):
if(countryName[i] == "中国"):
updatetimeList = str(updateTime[i]).split( )
updatetime = updatetimeList[0]
# 处理historyed
historyed_temp = list()
if(provinceName[i] == "中国"):
# 处理totaled
if(len(totaled) == 0):
totaled.append(str(updateTime[i]))
totaled.append(int(province_confirmedCount[i]))
totaled.append(int(province_curedCount[i]))
totaled.append(int(province_deadCount[i]))
if((len(historyed) > 0) and (str(updatetime) != historyed[len(historyed) - 1][0])):
historyed_temp.append(str(updatetime))
historyed_temp.append(int(province_confirmedCount[i]))
historyed_temp.append(int(province_curedCount[i]))
historyed_temp.append(int(province_deadCount[i]))
if(len(historyed) == 0):
historyed_temp.append(str(updatetime))
historyed_temp.append(int(province_confirmedCount[i]))
historyed_temp.append(int(province_curedCount[i]))
historyed_temp.append(int(province_deadCount[i]))
if(len(historyed_temp) > 0):
historyed.append(historyed_temp)
# 处理areaed
areaed_temp = list()
if(provinceName[i] != "中国"):
if(provinceName[i] != "内蒙古自治区" and provinceName[i] != "黑龙江省"):
provinceName[i] = provinceName[i][0:2]
else:
provinceName[i] = provinceName[i][0:3]
flag = 1
for item in areaed:
if(item[1] == str(cityName[i])):
flag = 0
if(flag == 1):
areaed_temp.append(str(provinceName[i]))
areaed_temp.append(str(cityName[i]))
areaed_temp.append(int(0 if np.isnan(city_confirmedCount[i]) else city_confirmedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_curedCount[i]) else city_curedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_deadCount[i]) else city_deadCount[i]))
areaed.append(areaed_temp)
flag = 1
for item in areaed_tmp:
if(item[0] == str(provinceName[i])):
flag = 0
if(flag == 1):
areaed_temp.append(str(provinceName[i]))
areaed_temp.append(str(cityName[i]))
areaed_temp.append(int(0 if np.isnan(city_confirmedCount[i]) else city_confirmedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_curedCount[i]) else city_curedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_deadCount[i]) else city_deadCount[i]))
areaed_tmp.append(areaed_temp)
# 处理provinceed(需要根据areaed获取)
province_temp = list()
for temp in areaed_tmp:
if(len(provinceed) == 0 and len(province_temp) == 0):
province_temp.append(temp[0])
province_temp.append(temp[2])
province_temp.append(temp[3])
province_temp.append(temp[4])
else:
if(temp[0] == province_temp[0]):
province_temp[1] = province_temp[1] + temp[2]
province_temp[1] = province_temp[2] + temp[3]
province_temp[1] = province_temp[3] + temp[4]
else:
provinceed.append(province_temp)
province_temp = list()
province_temp.append(temp[0])
province_temp.append(temp[2])
province_temp.append(temp[3])
province_temp.append(temp[4])
provinceed.append(province_temp)
print(数据清洗成功---------------)
这里没有什么说的,完全是体力活,将上面筛选出来的数据进行清洗,需要注意的是要仔细的观察读取出来的数据的数据格式,有些数据格式不是很标准,需要手动处理。
4. 将清洗的数据自动导入MySql
将数据导入Mysql这里还是使用python,使用了python的pymysql模块
import pymysql"""
将数据导入数据库
"""
# 打开数据库连接
db=pymysql.connect(host="localhost",user="root",password="123456",database="yq")
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
#创建yq数据库
cursor.execute(CREATE DATABASE IF NOT EXISTS yq DEFAULT CHARSET utf8 COLLATE utf8_general_ci;)
print(创建yq数据库成功)
#创建相关表表
cursor.execute(drop table if exists areaed)
sql="""
CREATE TABLE IF NOT EXISTS `areaed` (
`provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`cityName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`confirmedCount` int(11) NULL DEFAULT NULL,
`deadCount` int(11) NULL DEFAULT NULL,
`curedCount` int(11) NULL DEFAULT NULL,
`currentCount` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
"""
cursor.execute(sql)
cursor.execute(drop table if exists provinceed)
sql="""
CREATE TABLE `provinceed` (
`provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`confirmedNum` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`deathsNum` int(11) NULL DEFAULT NULL,
`curesNum` int(11) NULL DEFAULT NULL,
`currentNum` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
"""
cursor.execute(sql)
cursor.execute(drop table if exists totaled)
sql="""
CREATE TABLE `totaled` (
`date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`diagnosed` int(11) NULL DEFAULT NULL,
`death` int(11) NULL DEFAULT NULL,
`cured` int(11) NULL DEFAULT NULL,
`current` int(11) NULL DEFAULT NULL
) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
"""
cursor.execute(sql)
cursor.execute(drop table if exists historyed)
sql="""
CREATE TABLE `historyed` (
`date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`confirmedNum` int(11) NULL DEFAULT NULL,
`deathsNum` int(11) NULL DEFAULT NULL,
`curesNum` int(11) NULL DEFAULT NULL,
`currentNum` int(11) NULL DEFAULT NULL
) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
"""
cursor.execute(sql)
print(创建相关表成功)
# 导入historyed
for item in historyed:
sql=INSERT INTO historyed VALUES(%s,"%s","%s","%s","%s")
try:
cursor.execute(sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
break
print("导入historyed成功-------------")
# 导入areaed
for item in areaed:
sql=INSERT INTO areaed VALUES(%s,"%s","%s","%s","%s","%s")
try:
cursor.execute(sql,(item[0],item[1],item[2],item[4],item[3],item[2]-item[3]-item[4]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
break
print("导入areaed成功-------------")
# 导入provinceed
for item in provinceed:
sql=INSERT INTO provinceed VALUES(%s,"%s","%s","%s","%s")
try:
cursor.execute(sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
break
print("导入provinceed成功-------------")
# 导入totaled
sql=INSERT INTO totaled VALUES(%s,"%s","%s","%s","%s")
try:
cursor.execute(sql,(str(totaled[0]),totaled[1],totaled[3],totaled[2],totaled[1]-totaled[2]-totaled[3]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
print("导入totaled成功-------------")
cursor.close()#先关闭游标
db.close()#再关闭数据库连接
这里为了脚本的使用方便,首先进行了建库、然后建表、最后将清洗的数据导入MySql
完整代码
import pandas as pdimport numpy as np
import pymysql
"""
@ProjectName: cleanData
@FileName: cleanData.py
@Author: tao
@Date: 2022/05/03
"""
# 读取的文件
filePath = "DXYArea.csv"
# 全国历史数据
historyed = list()
# 全国最新数据
totaled = list()
# province最新数据
provinceed = list()
# area最新数据
areaed = list()
# 获取数据
def read_csv_feature(filePath):
# 读取文件
f = open(filePath, encoding=utf-8)
reader = pd.read_csv(f, sep=,, iterator=True)
loop = True
chunkSize = 1000000
chunks = []
while loop:
try:
chunk = reader.get_chunk(chunkSize)
chunks.append(chunk)
except StopIteration:
loop = False
df = pd.concat(chunks, axis=0, ignore_index=True)
f.close()
return df
data = read_csv_feature(filePath)
print(数据读取成功---------------)
areaed_tmp = list()
countryName = np.array(data["countryName"])
countryEnglishName = np.array(data["countryEnglishName"])
provinceName = np.array(data["provinceName"])
province_confirmedCount = np.array(data["province_confirmedCount"])
province_curedCount = np.array(data["province_curedCount"])
province_deadCount = np.array(data["province_deadCount"])
updateTime = np.array(data["updateTime"])
cityName = np.array(data["cityName"])
city_confirmedCount = np.array(data["city_confirmedCount"])
city_curedCount = np.array(data["city_curedCount"])
city_deadCount = np.array(data["city_deadCount"])
for i in range(len(data)):
if(countryName[i] == "中国"):
updatetimeList = str(updateTime[i]).split( )
updatetime = updatetimeList[0]
# 处理historyed
historyed_temp = list()
if(provinceName[i] == "中国"):
# 处理totaled
if(len(totaled) == 0):
totaled.append(str(updateTime[i]))
totaled.append(int(province_confirmedCount[i]))
totaled.append(int(province_curedCount[i]))
totaled.append(int(province_deadCount[i]))
if((len(historyed) > 0) and (str(updatetime) != historyed[len(historyed) - 1][0])):
historyed_temp.append(str(updatetime))
historyed_temp.append(int(province_confirmedCount[i]))
historyed_temp.append(int(province_curedCount[i]))
historyed_temp.append(int(province_deadCount[i]))
if(len(historyed) == 0):
historyed_temp.append(str(updatetime))
historyed_temp.append(int(province_confirmedCount[i]))
historyed_temp.append(int(province_curedCount[i]))
historyed_temp.append(int(province_deadCount[i]))
if(len(historyed_temp) > 0):
historyed.append(historyed_temp)
# 处理areaed
areaed_temp = list()
if(provinceName[i] != "中国"):
if(provinceName[i] != "内蒙古自治区" and provinceName[i] != "黑龙江省"):
provinceName[i] = provinceName[i][0:2]
else:
provinceName[i] = provinceName[i][0:3]
flag = 1
for item in areaed:
if(item[1] == str(cityName[i])):
flag = 0
if(flag == 1):
areaed_temp.append(str(provinceName[i]))
areaed_temp.append(str(cityName[i]))
areaed_temp.append(int(0 if np.isnan(city_confirmedCount[i]) else city_confirmedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_curedCount[i]) else city_curedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_deadCount[i]) else city_deadCount[i]))
areaed.append(areaed_temp)
flag = 1
for item in areaed_tmp:
if(item[0] == str(provinceName[i])):
flag = 0
if(flag == 1):
areaed_temp.append(str(provinceName[i]))
areaed_temp.append(str(cityName[i]))
areaed_temp.append(int(0 if np.isnan(city_confirmedCount[i]) else city_confirmedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_curedCount[i]) else city_curedCount[i]))
areaed_temp.append(int(0 if np.isnan(city_deadCount[i]) else city_deadCount[i]))
areaed_tmp.append(areaed_temp)
# 处理provinceed(需要根据areaed获取)
province_temp = list()
for temp in areaed_tmp:
if(len(provinceed) == 0 and len(province_temp) == 0):
province_temp.append(temp[0])
province_temp.append(temp[2])
province_temp.append(temp[3])
province_temp.append(temp[4])
else:
if(temp[0] == province_temp[0]):
province_temp[1] = province_temp[1] + temp[2]
province_temp[1] = province_temp[2] + temp[3]
province_temp[1] = province_temp[3] + temp[4]
else:
provinceed.append(province_temp)
province_temp = list()
province_temp.append(temp[0])
province_temp.append(temp[2])
province_temp.append(temp[3])
province_temp.append(temp[4])
provinceed.append(province_temp)
print(数据清洗成功---------------)
# print(historyed)
# print(areaed)
print(totaled)
# print(provinceed)
"""
print(len(provinceed))
for item in provinceed:
print(item[1]-item[2]-item[3])
"""
"""
将数据导入数据库
"""
# 打开数据库连接
db=pymysql.connect(host="localhost",user="root",password="123456",database="yq")
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
#创建yq数据库
cursor.execute(CREATE DATABASE IF NOT EXISTS yq DEFAULT CHARSET utf8 COLLATE utf8_general_ci;)
print(创建yq数据库成功)
#创建相关表表
cursor.execute(drop table if exists areaed)
sql="""
CREATE TABLE IF NOT EXISTS `areaed` (
`provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`cityName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`confirmedCount` int(11) NULL DEFAULT NULL,
`deadCount` int(11) NULL DEFAULT NULL,
`curedCount` int(11) NULL DEFAULT NULL,
`currentCount` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
"""
cursor.execute(sql)
cursor.execute(drop table if exists provinceed)
sql="""
CREATE TABLE `provinceed` (
`provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`confirmedNum` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`deathsNum` int(11) NULL DEFAULT NULL,
`curesNum` int(11) NULL DEFAULT NULL,
`currentNum` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
"""
cursor.execute(sql)
cursor.execute(drop table if exists totaled)
sql="""
CREATE TABLE `totaled` (
`date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`diagnosed` int(11) NULL DEFAULT NULL,
`death` int(11) NULL DEFAULT NULL,
`cured` int(11) NULL DEFAULT NULL,
`current` int(11) NULL DEFAULT NULL
) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
"""
cursor.execute(sql)
cursor.execute(drop table if exists historyed)
sql="""
CREATE TABLE `historyed` (
`date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`confirmedNum` int(11) NULL DEFAULT NULL,
`deathsNum` int(11) NULL DEFAULT NULL,
`curesNum` int(11) NULL DEFAULT NULL,
`currentNum` int(11) NULL DEFAULT NULL
) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
"""
cursor.execute(sql)
print(创建相关表成功)
# 导入historyed
for item in historyed:
sql=INSERT INTO historyed VALUES(%s,"%s","%s","%s","%s")
try:
cursor.execute(sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
break
print("导入historyed成功-------------")
# 导入areaed
for item in areaed:
sql=INSERT INTO areaed VALUES(%s,"%s","%s","%s","%s","%s")
try:
cursor.execute(sql,(item[0],item[1],item[2],item[4],item[3],item[2]-item[3]-item[4]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
break
print("导入areaed成功-------------")
# 导入provinceed
for item in provinceed:
sql=INSERT INTO provinceed VALUES(%s,"%s","%s","%s","%s")
try:
cursor.execute(sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
break
print("导入provinceed成功-------------")
# 导入totaled
sql=INSERT INTO totaled VALUES(%s,"%s","%s","%s","%s")
try:
cursor.execute(sql,(str(totaled[0]),totaled[1],totaled[3],totaled[2],totaled[1]-totaled[2]-totaled[3]))
db.commit()
except Exception as ex:
print("error:")
print("出现如下异常%s"%ex)
db.rollback()
print("导入totaled成功-------------")
cursor.close()#先关闭游标
db.close()#再关闭数据库连接
脚本运行效果
数据库可以看到以下表和数据
最后我们的数据就已经有了,此时的数据处理的格式还是参照我之前整的新冠肺炎疫情的数据采集和可视化分析系统对接的,集体后台和可视化的实现可以参考:https://qkongtao.cn/?p=514
到此这篇关于python清洗疫情历史数据的文章就介绍到这了,更多相关python疫情历史数据内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。