Асинхронная загрузка файла Python синтаксический анализ вывод в JSON

#python #python-requests #python-multiprocessing

#python #python-запросы #python-многопроцессорность

Вопрос:

Чтобы кратко объяснить контекст, я загружаю данные проспекта SEC, например. После загрузки я хочу проанализировать файл, чтобы извлечь определенные данные, затем вывести проанализированный словарь в файл JSON, который состоит из списка словарей. Я бы использовал базу данных SQL для вывода, но администраторы исследовательского кластера в моем университете медленно получают мне доступ. Если у кого-нибудь есть какие-либо предложения о том, как сохранить данные для удобства чтения / записи позже, я был бы признателен, я думал о HDF5 в качестве возможной альтернативы.

Минимальный пример того, что я делаю с местами, которые, как я думаю, мне нужно улучшить, помечены.

 def classify_file(doc):
    try:
        data = {
            'link': doc.url
        }
    except AttributeError:
        return {'flag': 'ATTRIBUTE ERROR'}
    # Do a bunch of parsing using regular expressions

if __name__=="__main__":
    items = list()
    for d in tqdm([y   ' '   q for y in ['2019'] for q in ['1']]):
        stream = os.popen('bash ./getformurls.sh '   d)
        stacked = stream.read().strip().split('n')
        # split each line into the fixed-width fields
        widths=(12,62,12,12,44)
        items  = [[item[sum(widths[:j]):sum(widths[:j 1])].strip() for j in range(len(widths))] for item in stacked]
    urls = [BASE_URL   item[4] for item in items]

    resp = list()
    # PROBLEM 1
    filelimit = 100
    for i in range(ceil(len(urls)/filelimit)):
        print(f'Downloading: {i*filelimit/len(urls)*100:2.0f}%...   ',end='r',flush=True)
        resp  = [r for r in grequests.map((grequests.get(u) for u in urls[i*filelimit:(i 1)*filelimit]))]

    # PROBLEM 2
    with Pool() as p:
        rs = p.map_async(classify_file,resp,chunksize=20)
        rs.wait()
        prospectus = rs.get()
    with open('prospectus_data.json') as f:
        json.dump(prospectus,f)
  

The getfileurls.sh упоминается написанный мной скрипт bash, который был быстрее, чем выполнение на python, поскольку я мог использовать grep, код для этого

 #!/bin/bash
BASE_URL="https://www.sec.gov/Archives/"
INDEX="edgar/full-index/"

url="${BASE_URL}${INDEX}$1/QTR$2/form.idx"
out=$(curl -s ${url} | grep "^485[A|B]POS")
echo "$out"
  

ПРОБЛЕМА 1: Итак, в настоящее время я извлекаю около 18 тысяч файлов в вызове grequests map. Я столкнулся с ошибкой, связанной со слишком большим количеством открытых файлов, поэтому я решил разделить список URL-адресов на управляемые фрагменты. Мне не нравится это решение, но оно работает.

ПРОБЛЕМА 2: вот где моя фактическая ошибка. Этот код отлично работает на меньшем наборе URL-адресов (~ 2k) на моем ноутбуке (использует 100% моего процессора и ~ 20 ГБ оперативной памяти ~ 10 ГБ для загрузки файлов и еще ~ 10 ГБ при запуске синтаксического анализа), но когда я переношу его на больший набор данных 18k, используя 40 ядер в исследовательском кластере, он загружается до ~ 100 ГБ оперативной памяти и ~ 3 ТБ подкачки, а затем вылетает после разбора около 2 тыс. документов за 20 минут с помощью KeyboardInterrupt с сервера.

Я действительно не понимаю, почему использование подкачки становится таким сумасшедшим, но я думаю, что мне действительно просто нужна помощь с управлением памятью здесь. Есть ли способ создать генератор неотправленных запросов, которые будут отправлены, когда я вызову classify_file() для них позже? Любая помощь была бы оценена.

Комментарии:

1. Для задачи 1 grequests.map имеет параметр size: размер: определяет количество запросов, которые необходимо выполнить одновременно. Если нет, регулирование не происходит.

Ответ №1:

Как правило, когда у вас происходит чрезмерное использование памяти с помощью Pool , это происходит потому, что рабочие элементы используются повторно и накапливают память с каждой итерацией. Вы можете иногда закрывать и повторно открывать пул, чтобы предотвратить это, но это настолько распространенная проблема, что в Python теперь есть встроенный параметр, который сделает это за вас…

Пул (… maxtasksperchild ) — это количество задач, которые рабочий процесс может выполнить, прежде чем он завершит работу и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. Значение maxtasksperchild по умолчанию равно None, что означает, что рабочие процессы будут жить столько же, сколько и пул.

Я не могу сказать вам, какое значение является правильным, но обычно вы хотите установить его достаточно низким, чтобы ресурсы могли освобождаться довольно часто, но не настолько низким, чтобы это замедляло работу. (Возможно, обработка займет несколько минут… просто как предположение)

 with Pool(maxtasksperchild=5) as p:
    rs = p.map_async(classify_file,resp,chunksize=20)
    rs.wait()
    prospectus = rs.get()
  

Для вашей первой проблемы вы могли бы рассмотреть возможность просто использовать requests и переместить вызов внутри рабочего процесса, который у вас уже есть. Извлечение URL-адресов на сумму 18 тыс. и первоначальное кэширование всех этих данных потребует времени и памяти. Если все это инкапсулировано в worker, вы сведете к минимуму использование данных, и вам не нужно будет запускать так много дескрипторов открытых файлов.

Комментарии:

1. maxtasksperchild устраняет проблему. Я переместил запросы в функцию, вызываемую пулом. Было бы здорово иметь возможность определять асинхронный def classify_file(x), чтобы дочерний рабочий переходил к следующему документу во время загрузки первого. Я считаю, что это сократило бы время выполнения вдвое, поскольку загрузка файла занимает примерно столько же времени, сколько его фактическая обработка. Я не уверен, есть ли способ сделать это.

2. Я бы просто установил количество процессов в пуле на большее число (возможно, в 2 раза больше количества процессоров). Сначала они могут начать все загрузки вместе, а затем обработку, но после раунда или около того я бы ожидал, что все немного пошатнется, поэтому некоторые процессы загружаются одновременно с обработкой других. Если вы можете установить пул (N) на некоторое число, чтобы все ваши процессоры были почти максимальными, вы будете знать, что оптимизированы настолько хорошо, насколько это возможно.