В первые дни пандемии для специалистов по обработке данных и аналитиков было популярным упражнением по созданию информационных панелей для отслеживания COVID. Был запущен ряд общедоступных API, но один за другим эти сервисы переводятся в автономный режим. По состоянию на декабрь 2021 года репозиторий New York Times на GitHub остается одним из последних высококачественных источников данных COVID, но даже этот источник не является всеобъемлющим и может быть неудобным для использования в ваших проектах (он распространяется через CSV-файлы).
Несмотря на то, что агрегаторы смягчают публикацию кураторских данных, департаменты здравоохранения штатов продолжают тщательно сообщать статистику COVID по округам. Например, канал погоды использует департамент здравоохранения каждого штата в качестве источника данных при отображении текущей информации о COVID.
Вместо того, чтобы полагаться на посредников, таких как «Нью-Йорк Таймс», вы можете довольно легко написать свои собственные сценарии для получения непосредственно из штатов. В этой статье я покажу вам, как быстро получить данные о COVID в округах.
Мотивация
Несколько недель назад ко мне обратился клиент за советом о том, как автоматически отслеживать данные COVID-19 для определенных мест. В течение двух лет он вручную искал и вводил данные в большую электронную таблицу, и этот процесс отнимал значительное время у его реальной работы.
Я пообещал, что рассмотрю наши варианты, и очень быстро понял, что большинство популярных API-интерфейсов уже закрыты. Я начал с соскоба из «Нью-Йорк таймс«, хотя несколько нужных нам округов отсутствовали в наборе данных. Затем я подумал о том, чтобы очистить Канал Погоды, но потом меня осенило:
Почему я не могу просто получать данные непосредственно из штатов? В конце концов, это то, что в любом случае делают такие агрегаторы, как NYT.
Поэтому после нескольких часов исследований я написал несколько плагинов и собрал панель мониторинга Grafana (на фото выше), чтобы визуализировать свои результаты. Мой клиент был определенно доволен, поэтому я решил изложить свои выводы в этой статье!
Получение данных
Чтобы получить данные в Pandas, давайте использовать встроенную базу данных SQLite sql:local
для хранения наших данных. Вы всегда можете использовать свою собственную базу данных, например, если хотите создать собственные информационные панели Grafana.
Я опубликовал свои сценарии извлечения в виде плагина covid
Meerschaum ― формата модуля Python для извлечения данных.
Я предоставляю информацию об использовании в репозитории GitHub плагина, и ниже я более подробно расскажу о том, как я решил получить данные. Для начала сначала установите Meerschaum из PyPI и установите плагин. covid
$ pip install -U --user meerschaum
$ mrsm install plugin covid
Зарегистрируйте трубу с помощью соединителя plugin:covid
и на примере sql:local
, и при появлении запроса введите список кодов FIPS, соответствующих вашим округам.
$ mrsm register pipe -c plugin:covid -m cases -i sql:local
❓ Please enter a list of FIPS codes separated by commas: 08031,48113,45043,45045,45007,37107,37021,47157,47147
Теперь, когда мы настроили наши каналы, давайте запустим поток этих данных. Запустите sync pipes
команда для извлечения данных ― первый раз может занять минуту или около того.
mrsm sync pipes -i sql:local
Вот и все! Вы можете непрерывно синхронизировать с --loop
и --min-seconds
, и в -d
или --daemon
флаг запускает команду в качестве фонового задания.
mrsm sync pipes -i sql:local --loop --min-seconds 3600 -d
Нажмите на трубу
Вот фрагмент кода, демонстрирующий способы доступа к данным вашей трубы:
>>> import meerschaum as mrsm
>>>
>>> pipe = mrsm.Pipe('plugin:covid', 'cases', instance='sql:local')
>>>
>>> df = pipe.get_data()
>>> df
date fips cases
0 2021-12-22 48113 359035
1 2021-12-21 37021 31519
2 2021-12-21 37107 9997
3 2021-12-21 45007 38888
4 2021-12-21 45043 10114
... ... ... ...
5807 2020-03-08 48113 0
5808 2020-03-07 48113 0
5809 2020-03-06 48113 0
5810 2020-03-05 48113 0
5811 2020-03-04 48113 0
[5812 rows x 3 columns]
>>>
>>> ### Optionally specify begin / end datetimes, params, etc.
>>> df = pipe.get_data(begin='2021-01-01', params={'fips': ['48113', '37021']})
>>> df
date fips cases
0 2021-12-22 48113 359035
1 2021-12-21 37021 31519
2 2021-12-21 48113 358623
3 2021-12-20 37021 31454
4 2021-12-20 48113 357985
.. ... ... ...
706 2021-01-03 48113 172165
707 2021-01-02 37021 9676
708 2021-01-02 48113 172165
709 2021-01-01 37021 9195
710 2021-01-01 48113 172165
[711 rows x 3 columns]
>>>
>>> df.dtypes
date datetime64[ns]
fips object
cases int64
dtype: object
>>>
как это работает
Это та часть статьи, где я отодвигаю занавес и раскрываю свои секреты. То covid
сам плагин выполняет только другие плагины и объединяет их результаты; например, US-covid
плагин отрывки из «Нью-Йорк Таймс»:
def fetch(
pipe: Pipe,
begin: Optional[datetime.datetime],
end: Optional[datetime.datetime],
debug: bool = False,
**kw: Any
):
import pandas as pd
import duckdb
from meerschaum.utils.misc import wget
TMP_PATH.mkdir(parents=True, exist_ok=True)
all_filepath = TMP_PATH / 'us-counties.csv'
recent_filepath = TMP_PATH / 'us-counties-recent.csv'
fips = pipe.parameters['US-covid']['fips']
wget(RECENT_URL, recent_filepath, debug=debug)
recent_df = _get_df(recent_filepath, fips, begin, end)
st = pipe.get_sync_time(debug=debug)
if st is not None and len(recent_df) > 0 and min(recent_df['date']) <= st:
return recent_df
wget(ALL_URL, all_filepath, debug=debug)
return _get_df(all_filepath, fips, begin, end)
def _get_df(
csv_path: pathlib.Path,
fips: List[str],
begin: Optional[datetime.datetime],
end: Optional[datetime.datetime]
) -> 'pd.DataFrame':
import duckdb
dtypes = {
'date': 'datetime64[ms]',
'county': str,
'state': str,
'fips': str,
'cases': int,
'deaths': int,
}
fips_where = "'" + "', '".join(fips) + "'"
query = """
SELECT *
FROM read_csv(
'""" + str(csv_path) + """',
header = True,
columns = {
'date': 'DATE',
'county': 'VARCHAR',
'state': 'VARCHAR',
'fips': 'VARCHAR',
'cases': 'INT',
'deaths': 'INT'
}
)
WHERE fips IN (""" + fips_where + """)
"""
if begin is not None:
begin -= datetime.timedelta(days=2)
query += f"\n AND CAST(date AS DATE) >= CAST('{begin}' AS DATE)"
if end is not None:
query += f"\n AND CAST(date AS DATE) <= CAST('{end}' AS DATE)"
result = duckdb.query(query)
return result.df()[dtypes.keys()].astype(dtypes)
Извлечение данных из определенных штатов различается по сложности ― например, Калифорния предоставляет CSV, поэтому анализ аналогичен NYT:
def fetch(
pipe: meerschaum.Pipe,
begin: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
debug: bool = False,
**kw
):
from meerschaum.utils.misc import wget
import pandas as pd
import duckdb
import textwrap
TMP_PATH.mkdir(exist_ok=True, parents=True)
wget(CSV_URL, CSV_PATH)
dtypes = {
'date': 'datetime64[ms]',
'county': str,
'fips': str,
'cases': int,
'deaths': int,
}
fips = pipe.parameters['CA-covid']['fips']
fips_where = "'" + "', '".join(fips) + "'"
counties_df = pd.read_csv(COUNTIES_PATH, dtype={'fips': str, 'counties': str, 'state': str})
query = textwrap.dedent(f"""
SELECT
CAST(d.date AS DATE) AS date,
c.fips,
c.county,
d.cumulative_cases AS cases,
d.cumulative_deaths AS deaths
FROM read_csv_auto('{str(CSV_PATH)}') AS d
INNER JOIN counties_df AS c ON c.county = d.area
WHERE c.fips IN ({fips_where})
AND d.cumulative_deaths IS NOT NULL
AND d.cumulative_cases IS NOT NULL
AND d.date IS NOT NULL
AND c.fips IS NOT NULL"""
)
begin = begin if begin is not None else pipe.get_sync_time(debug=debug)
if begin is not None:
begin -= datetime.timedelta(days=2)
query += f"\n AND CAST(d.date AS DATE) >= CAST('{begin}' AS DATE)"
if end is not None:
query += f"\n AND CAST(d.date AS DATE) <= CAST('{end}' AS DATE)"
result = duckdb.query(query)
df = result.df()[dtypes.keys()].astype(dtypes)
return df
Но другие государства не так прямолинейны. TX-covid
Плагин загружает и анализирует электронную таблицу XLSX, содержащую комментарии, и добавляет новый столбец для каждого дня. Разбор этого эзотерического формата был огромной головной болью, что демонстрирует причину, по которой система плагинов Meerschaum настолько мощна.
def fetch(
pipe: meerschaum.Pipe,
begin: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
debug: bool = False,
**kw
):
import pandas as pd
from meerschaum.utils.misc import wget
from dateutil import parser
import datetime
import duckdb
import textwrap
from meerschaum.utils.debug import dprint
TMP_PATH.mkdir(exist_ok=True, parents=True)
fips = pipe.parameters['TX-covid']['fips']
fips_where = "'" + "', '".join(fips) + "'"
counties_df = pd.read_csv(COUNTIES_PATH, dtype={'fips': str, 'counties': str, 'state': str})
dtypes = {
'date': 'datetime64[ms]',
'county': str,
'fips': str,
'cases': int,
}
wget(XLSX_URL, XLSX_PATH, debug=debug)
df = pd.read_excel(XLSX_PATH, skiprows=[0], header=1, nrows=254)
data = {'date': [], 'county': [], 'cases': [],}
counties = list(df['County Name'])
begin = begin if begin is not None else (pipe.get_sync_time(debug=debug) if end is None else None)
if end is not None and begin is not None and end < begin:
begin = end - datetime.timedelta(days=1)
for col in df.columns[1:]:
date = parser.parse(col[len('Cases '):])
if begin is not None and date < begin:
continue
if end is not None and date > end:
break
for i, county in enumerate(counties):
data['date'].append(date)
data['county'].append(county)
data['cases'].append(df[col][i])
clean_df = pd.DataFrame(data).astype({col: typ for col, typ in dtypes.items() if col in data})
if debug:
print(clean_df)
query = textwrap.dedent(f"""
SELECT
CAST(d.date AS DATE) AS date,
c.fips,
c.county,
d.cases AS cases
FROM clean_df AS d
INNER JOIN counties_df AS c ON c.county = d.county
WHERE c.fips IN ({fips_where})
AND d.cases IS NOT NULL
AND d.date IS NOT NULL"""
)
if begin is not None:
begin -= datetime.timedelta(days=2)
query += f"\n AND CAST(d.date AS DATE) >= CAST('{begin}' AS DATE)"
if end is not None:
query += f"\n AND CAST(d.date AS DATE) <= CAST('{end}' AS DATE)"
if debug:
print(query)
joined_df = duckdb.query(query).df()[dtypes.keys()].astype(dtypes)
if debug:
print(joined_df)
return joined_df
Как и Калифорния, Джорджия распространяет данные в CSV, но сначала вам нужно извлечь ZIP-файл, чтобы получить наборы данных, которые вы ищете. Вот мое решение из GA-covid
плагин:
def fetch(
pipe: meerschaum.Pipe,
begin: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
debug: bool = False,
**kw
):
import zipfile, textwrap
from meerschaum.utils.misc import wget
import duckdb
import pandas as pd
import shutil
TMP_PATH.mkdir(exist_ok=True, parents=True)
if UNZIP_PATH.exists():
shutil.rmtree(UNZIP_PATH)
wget(ZIP_URL, ZIP_PATH)
with zipfile.ZipFile(ZIP_PATH, 'r') as zip_ref:
zip_ref.extractall(UNZIP_PATH)
dtypes = {
'date': 'datetime64[ms]',
'county': str,
'fips': str,
'cases': int,
'deaths': int,
}
fips = pipe.parameters['GA-covid']['fips']
fips_where = "'" + "', '".join(fips) + "'"
counties_df = pd.read_csv(COUNTIES_PATH, dtype={'fips': str, 'counties': str, 'state': str})
query = textwrap.dedent(f"""
SELECT
CAST(d.report_date AS DATE) AS date,
c.fips,
c.county,
CAST(d.cases_cum AS INT) AS cases,
CAST(d.death_cum AS INT) AS deaths
FROM read_csv_auto('{str(CSV_PATH)}') AS d
INNER JOIN counties_df AS c ON c.county = d.county
WHERE c.fips IN ({fips_where})
AND d.cases_cum IS NOT NULL
AND d.death_cum IS NOT NULL
AND d.report_date IS NOT NULL"""
)
begin = begin if begin is not None else pipe.get_sync_time(debug=debug)
if begin is not None:
begin -= datetime.timedelta(days=2)
query += f"\n AND CAST(d.report_date AS DATE) >= CAST('{begin}' AS DATE)"
if end is not None:
query += f"\n AND CAST(d.report_date AS DATE) <= CAST('{end}' AS DATE)"
result = duckdb.query(query)
df = result.df()[dtypes.keys()].astype(dtypes)
return df
Наконец, Колорадо предоставляет API RESTful, который чувствует себя гораздо менее напряженным, чем в других штатах (хотя API, похоже, не полностью функциональен, поэтому в нем есть доля раздражительности). Ниже приводится выдержка из CO-covid
плагин:
def fetch(
pipe: meerschaum.Pipe,
begin: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
debug: bool = False,
**kw
) -> Dict[str, Any]:
from dateutil import parser
import datetime, requests, pandas as pd
from meerschaum.utils.formatting import pprint
fips = pipe.parameters['CO-covid']['fips']
fips_where = "'" + "', '".join([f[2:] for f in fips]) + "'"
st = pipe.get_sync_time(debug=debug)
where = f"FIPS IN ({fips_where}) AND Metric IN ('Cases', 'Deaths')"
begin = begin if begin is not None else pipe.get_sync_time(debug=debug)
if begin is not None:
begin -= datetime.timedelta(days=2)
where += f" AND CAST(Date AS DATE) >= CAST(\'{begin.strftime('%m/%d/%Y')}\' AS DATE)"
if end is not None:
where += f" AND CAST(Date AS DATE) <= CAST(\'{end.strftime('%m/%d/%Y')}\' AS DATE)"
params = {
'where': where,
'outFields': 'COUNTY,FIPS,Metric,Value,Date',
'f': 'json',
}
if debug:
pprint(params)
final_data = {
'date': [],
'county': [],
'fips': [],
'cases': [],
'deaths': [],
}
data = requests.get(BASE_URL, params=params).json()
if debug:
pprint(data)
for i, row in enumerate(data['features']):
attrs = row['attributes']
if attrs['Metric'] == 'Cases':
continue
final_data['date'].append(parser.parse(attrs['Date']))
final_data['county'].append(attrs['COUNTY'].lower().capitalize())
final_data['fips'].append('08' + attrs['FIPS'])
final_data['deaths'].append(int(attrs['Value']))
final_data['cases'].append(int(data['features'][i + 1]['attributes']['Value']))
if debug:
pprint(final_data)
return final_data
Вывод
Пенка уже очень модульная, и covid
плагин работает аналогичным образом. Если вы хотите внести свой вклад, вы можете напишите и опубликуйте плагин для вашего штата и откройте пиар в covid
репозиторий плагинов.