You are currently viewing Руководство специалиста по обработке данных по сбору данных COVID-19 в 2022 году

Руководство специалиста по обработке данных по сбору данных COVID-19 в 2022 году

В первые дни пандемии для специалистов по обработке данных и аналитиков было популярным упражнением по созданию информационных панелей для отслеживания COVID. Был запущен ряд общедоступных API, но один за другим эти сервисы переводятся в автономный режим. По состоянию на декабрь 2021 года репозиторий New York Times на GitHub остается одним из последних высококачественных источников данных COVID, но даже этот источник не является всеобъемлющим и может быть неудобным для использования в ваших проектах (он распространяется через CSV-файлы).

Несмотря на то, что агрегаторы смягчают публикацию кураторских данных, департаменты здравоохранения штатов продолжают тщательно сообщать статистику COVID по округам. Например, канал погоды использует департамент здравоохранения каждого штата в качестве источника данных при отображении текущей информации о COVID.

Вместо того, чтобы полагаться на посредников, таких как «Нью-Йорк Таймс», вы можете довольно легко написать свои собственные сценарии для получения непосредственно из штатов. В этой статье я покажу вам, как быстро получить данные о COVID в округах.

Мотивация

Несколько недель назад ко мне обратился клиент за советом о том, как автоматически отслеживать данные COVID-19 для определенных мест. В течение двух лет он вручную искал и вводил данные в большую электронную таблицу, и этот процесс отнимал значительное время у его реальной работы.

Я пообещал, что рассмотрю наши варианты, и очень быстро понял, что большинство популярных API-интерфейсов уже закрыты. Я начал с соскоба из «Нью-Йорк таймс«, хотя несколько нужных нам округов отсутствовали в наборе данных. Затем я подумал о том, чтобы очистить Канал Погоды, но потом меня осенило:

Почему я не могу просто получать данные непосредственно из штатов? В конце концов, это то, что в любом случае делают такие агрегаторы, как NYT.

Поэтому после нескольких часов исследований я написал несколько плагинов и собрал панель мониторинга Grafana (на фото выше), чтобы визуализировать свои результаты. Мой клиент был определенно доволен, поэтому я решил изложить свои выводы в этой статье!

Получение данных

Чтобы получить данные в Pandas, давайте использовать встроенную базу данных SQLite sql:localдля хранения наших данных. Вы всегда можете использовать свою собственную базу данных, например, если хотите создать собственные информационные панели Grafana.

Я опубликовал свои сценарии извлечения в виде плагина covidMeerschaum ― формата модуля Python для извлечения данных.

Я предоставляю информацию об использовании в репозитории GitHub плагина, и ниже я более подробно расскажу о том, как я решил получить данные. Для начала сначала установите Meerschaum из PyPI и установите плагин. covid

$ pip install -U --user meerschaum
$ mrsm install plugin covid

Зарегистрируйте трубу с помощью соединителя plugin:covid и на примере sql:local, и при появлении запроса введите список кодов FIPS, соответствующих вашим округам.

$ mrsm register pipe -c plugin:covid -m cases -i sql:local
❓ Please enter a list of FIPS codes separated by commas: 08031,48113,45043,45045,45007,37107,37021,47157,47147

Теперь, когда мы настроили наши каналы, давайте запустим поток этих данных. Запустите sync pipes команда для извлечения данных ― первый раз может занять минуту или около того.

mrsm sync pipes -i sql:local

Вот и все! Вы можете непрерывно синхронизировать с --loop и --min-seconds, и в -d или --daemon флаг запускает команду в качестве фонового задания.

mrsm sync pipes -i sql:local --loop --min-seconds 3600 -d

Нажмите на трубу

Вот фрагмент кода, демонстрирующий способы доступа к данным вашей трубы:

>>> import meerschaum as mrsm
>>> 
>>> pipe = mrsm.Pipe('plugin:covid', 'cases', instance='sql:local')
>>> 
>>> df = pipe.get_data()
>>> df
           date   fips   cases
0    2021-12-22  48113  359035
1    2021-12-21  37021   31519
2    2021-12-21  37107    9997
3    2021-12-21  45007   38888
4    2021-12-21  45043   10114
...         ...    ...     ...
5807 2020-03-08  48113       0
5808 2020-03-07  48113       0
5809 2020-03-06  48113       0
5810 2020-03-05  48113       0
5811 2020-03-04  48113       0

[5812 rows x 3 columns]
>>> 
>>> ### Optionally specify begin / end datetimes, params, etc.
>>> df = pipe.get_data(begin='2021-01-01', params={'fips': ['48113', '37021']})
>>> df
          date   fips   cases
0   2021-12-22  48113  359035
1   2021-12-21  37021   31519
2   2021-12-21  48113  358623
3   2021-12-20  37021   31454
4   2021-12-20  48113  357985
..         ...    ...     ...
706 2021-01-03  48113  172165
707 2021-01-02  37021    9676
708 2021-01-02  48113  172165
709 2021-01-01  37021    9195
710 2021-01-01  48113  172165

[711 rows x 3 columns]
>>> 
>>> df.dtypes
date     datetime64[ns]
fips             object
cases             int64
dtype: object
>>> 

как это работает

Это та часть статьи, где я отодвигаю занавес и раскрываю свои секреты. То covid сам плагин выполняет только другие плагины и объединяет их результаты; например, US-covid плагин отрывки из «Нью-Йорк Таймс»:

def fetch(
        pipe: Pipe,
        begin: Optional[datetime.datetime],
        end: Optional[datetime.datetime],
        debug: bool = False,
        **kw: Any
    ):
    import pandas as pd
    import duckdb
    from meerschaum.utils.misc import wget

    TMP_PATH.mkdir(parents=True, exist_ok=True)
    all_filepath = TMP_PATH / 'us-counties.csv'
    recent_filepath = TMP_PATH / 'us-counties-recent.csv'
    fips = pipe.parameters['US-covid']['fips']

    wget(RECENT_URL, recent_filepath, debug=debug)
    recent_df = _get_df(recent_filepath, fips, begin, end)
    st = pipe.get_sync_time(debug=debug)
    if st is not None and len(recent_df) > 0 and min(recent_df['date']) <= st:
        return recent_df

    wget(ALL_URL, all_filepath, debug=debug)
    return _get_df(all_filepath, fips, begin, end)

def _get_df(
        csv_path: pathlib.Path,
        fips: List[str],
        begin: Optional[datetime.datetime],
        end: Optional[datetime.datetime]
    ) -> 'pd.DataFrame':
    import duckdb
    dtypes = {
        'date': 'datetime64[ms]',
        'county': str,
        'state': str,
        'fips': str,
        'cases': int,
        'deaths': int,
    }
    fips_where = "'" + "', '".join(fips) + "'"

    query = """
        SELECT *
        FROM read_csv(
            '""" + str(csv_path) + """',
            header = True,
            columns = {
                'date': 'DATE',
                'county': 'VARCHAR',
                'state': 'VARCHAR',
                'fips': 'VARCHAR',
                'cases': 'INT',
                'deaths': 'INT'
            }
        )
        WHERE fips IN (""" + fips_where + """)
    """
    if begin is not None:
        begin -= datetime.timedelta(days=2)
        query += f"\n    AND CAST(date AS DATE) >= CAST('{begin}' AS DATE)"
    if end is not None:
        query += f"\n    AND CAST(date AS DATE) <= CAST('{end}' AS DATE)"
    result = duckdb.query(query)
    return result.df()[dtypes.keys()].astype(dtypes)

Извлечение данных из определенных штатов различается по сложности ― например, Калифорния предоставляет CSV, поэтому анализ аналогичен NYT:

def fetch(
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        debug: bool = False,
        **kw
    ):
    from meerschaum.utils.misc import wget
    import pandas as pd
    import duckdb
    import textwrap
    TMP_PATH.mkdir(exist_ok=True, parents=True)
    wget(CSV_URL, CSV_PATH)
    dtypes = {
        'date': 'datetime64[ms]',
        'county': str,
        'fips': str,
        'cases': int,
        'deaths': int,
    }
    fips = pipe.parameters['CA-covid']['fips']
    fips_where = "'" + "', '".join(fips) + "'"
    counties_df = pd.read_csv(COUNTIES_PATH, dtype={'fips': str, 'counties': str, 'state': str})

    query = textwrap.dedent(f"""
        SELECT
            CAST(d.date AS DATE) AS date, 
            c.fips,
            c.county,
            d.cumulative_cases AS cases,
            d.cumulative_deaths AS deaths
        FROM read_csv_auto('{str(CSV_PATH)}') AS d
        INNER JOIN counties_df AS c ON c.county = d.area
        WHERE c.fips IN ({fips_where})
            AND d.cumulative_deaths IS NOT NULL
            AND d.cumulative_cases IS NOT NULL
            AND d.date IS NOT NULL
            AND c.fips IS NOT NULL"""
    )
    begin = begin if begin is not None else pipe.get_sync_time(debug=debug)
    if begin is not None:
        begin -= datetime.timedelta(days=2)
        query += f"\n    AND CAST(d.date AS DATE) >= CAST('{begin}' AS DATE)"
    if end is not None:
        query += f"\n    AND CAST(d.date AS DATE) <= CAST('{end}' AS DATE)"

    result = duckdb.query(query)
    df = result.df()[dtypes.keys()].astype(dtypes)
    return df

Но другие государства не так прямолинейны. TX-covid Плагин загружает и анализирует электронную таблицу XLSX, содержащую комментарии, и добавляет новый столбец для каждого дня. Разбор этого эзотерического формата был огромной головной болью, что демонстрирует причину, по которой система плагинов Meerschaum настолько мощна.

def fetch(
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        debug: bool = False,
        **kw
    ):
    import pandas as pd
    from meerschaum.utils.misc import wget
    from dateutil import parser
    import datetime
    import duckdb
    import textwrap
    from meerschaum.utils.debug import dprint
    TMP_PATH.mkdir(exist_ok=True, parents=True)
    fips = pipe.parameters['TX-covid']['fips']
    fips_where = "'" + "', '".join(fips) + "'"
    counties_df = pd.read_csv(COUNTIES_PATH, dtype={'fips': str, 'counties': str, 'state': str})

    dtypes = {
        'date': 'datetime64[ms]',
        'county': str,
        'fips': str,
        'cases': int,
    }

    wget(XLSX_URL, XLSX_PATH, debug=debug)
    df = pd.read_excel(XLSX_PATH, skiprows=[0], header=1, nrows=254)
    data = {'date': [], 'county': [], 'cases': [],}
    counties = list(df['County Name'])

    begin = begin if begin is not None else (pipe.get_sync_time(debug=debug) if end is None else None)
    if end is not None and begin is not None and end < begin:
        begin = end - datetime.timedelta(days=1)
    for col in df.columns[1:]:
        date = parser.parse(col[len('Cases '):])
        if begin is not None and date < begin:
            continue
        if end is not None and date > end:
            break
        for i, county in enumerate(counties):
            data['date'].append(date)
            data['county'].append(county)
            data['cases'].append(df[col][i])

    clean_df = pd.DataFrame(data).astype({col: typ for col, typ in dtypes.items() if col in data})
    if debug:
        print(clean_df)
    query = textwrap.dedent(f"""
    SELECT
        CAST(d.date AS DATE) AS date, 
        c.fips,
        c.county,
        d.cases AS cases
    FROM clean_df AS d
    INNER JOIN counties_df AS c ON c.county = d.county
    WHERE c.fips IN ({fips_where})
        AND d.cases IS NOT NULL
        AND d.date IS NOT NULL"""
    )
    if begin is not None:
        begin -= datetime.timedelta(days=2)
        query += f"\n    AND CAST(d.date AS DATE) >= CAST('{begin}' AS DATE)"
    if end is not None:
        query += f"\n    AND CAST(d.date AS DATE) <= CAST('{end}' AS DATE)"
    if debug:
        print(query)

    joined_df = duckdb.query(query).df()[dtypes.keys()].astype(dtypes)
    if debug:
        print(joined_df)
    return joined_df

Как и Калифорния, Джорджия распространяет данные в CSV, но сначала вам нужно извлечь ZIP-файл, чтобы получить наборы данных, которые вы ищете. Вот мое решение из GA-covid плагин:

def fetch(
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        debug: bool = False,
        **kw
    ):
    import zipfile, textwrap
    from meerschaum.utils.misc import wget
    import duckdb
    import pandas as pd
    import shutil
    TMP_PATH.mkdir(exist_ok=True, parents=True)
    if UNZIP_PATH.exists():
        shutil.rmtree(UNZIP_PATH)
    wget(ZIP_URL, ZIP_PATH)
    with zipfile.ZipFile(ZIP_PATH, 'r') as zip_ref:
        zip_ref.extractall(UNZIP_PATH)

    dtypes = {
        'date': 'datetime64[ms]',
        'county': str,
        'fips': str,
        'cases': int,
        'deaths': int,
    }
    fips = pipe.parameters['GA-covid']['fips']
    fips_where = "'" + "', '".join(fips) + "'"
    counties_df = pd.read_csv(COUNTIES_PATH, dtype={'fips': str, 'counties': str, 'state': str})

    query = textwrap.dedent(f"""
        SELECT
            CAST(d.report_date AS DATE) AS date, 
            c.fips,
            c.county,
            CAST(d.cases_cum AS INT) AS cases,
            CAST(d.death_cum AS INT) AS deaths
        FROM read_csv_auto('{str(CSV_PATH)}') AS d
        INNER JOIN counties_df AS c ON c.county = d.county
        WHERE c.fips IN ({fips_where})
            AND d.cases_cum IS NOT NULL
            AND d.death_cum IS NOT NULL
            AND d.report_date IS NOT NULL"""
    )
    begin = begin if begin is not None else pipe.get_sync_time(debug=debug)
    if begin is not None:
        begin -= datetime.timedelta(days=2)
        query += f"\n    AND CAST(d.report_date AS DATE) >= CAST('{begin}' AS DATE)"
    if end is not None:
        query += f"\n    AND CAST(d.report_date AS DATE) <= CAST('{end}' AS DATE)"
    result = duckdb.query(query)
    df = result.df()[dtypes.keys()].astype(dtypes)
    return df

Наконец, Колорадо предоставляет API RESTful, который чувствует себя гораздо менее напряженным, чем в других штатах (хотя API, похоже, не полностью функциональен, поэтому в нем есть доля раздражительности). Ниже приводится выдержка из CO-covid плагин:

def fetch(
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None, 
        debug: bool = False,
        **kw
    ) -> Dict[str, Any]:
    from dateutil import parser
    import datetime, requests, pandas as pd
    from meerschaum.utils.formatting import pprint

    fips = pipe.parameters['CO-covid']['fips']
    fips_where = "'" + "', '".join([f[2:] for f in fips]) + "'"
    st = pipe.get_sync_time(debug=debug)
    where = f"FIPS IN ({fips_where}) AND Metric IN ('Cases', 'Deaths')"
    begin = begin if begin is not None else pipe.get_sync_time(debug=debug)
    if begin is not None:
        begin -= datetime.timedelta(days=2)
        where += f" AND CAST(Date AS DATE) >= CAST(\'{begin.strftime('%m/%d/%Y')}\' AS DATE)"
    if end is not None:
        where += f" AND CAST(Date AS DATE) <= CAST(\'{end.strftime('%m/%d/%Y')}\' AS DATE)"
    params = {
        'where': where,
        'outFields': 'COUNTY,FIPS,Metric,Value,Date',
        'f': 'json',
    }
    if debug:
        pprint(params)

    final_data = {
        'date': [],
        'county': [],
        'fips': [],
        'cases': [],
        'deaths': [],
    }

    data = requests.get(BASE_URL, params=params).json()
    if debug:
        pprint(data)
    for i, row in enumerate(data['features']):
        attrs = row['attributes']
        if attrs['Metric'] == 'Cases':
            continue
        final_data['date'].append(parser.parse(attrs['Date']))
        final_data['county'].append(attrs['COUNTY'].lower().capitalize())
        final_data['fips'].append('08' + attrs['FIPS'])
        final_data['deaths'].append(int(attrs['Value']))
        final_data['cases'].append(int(data['features'][i + 1]['attributes']['Value']))
    if debug:
        pprint(final_data)
    return final_data

Вывод

Пенка уже очень модульная, и covid плагин работает аналогичным образом. Если вы хотите внести свой вклад, вы можете напишите и опубликуйте плагин для вашего штата и откройте пиар в covid репозиторий плагинов.