Пропущенный / потерянный файл выводится при использовании многопроцессорной обработки с python и pandas

#python #python-3.x #pandas #multiprocessing #python-multiprocessing

#python #python-3.x #pandas #многопроцессорная обработка #python-многопроцессорная обработка

Вопрос:

У меня есть набор истории цен на акции, содержит 8160 файлов в папке «Raw». Я использую pandas для внесения некоторых изменений в каждый из них, чем выводить новый файл в другую папку, «очищенную». Любой файл, который не удалось обработать, следует записать в папку с именем «Сбой».

Все файлы представляют собой csv.

После того, как я запустил приведенный ниже код, я обнаружил, что в разделе сбой нет файлов. Но около 100 файлов отсутствуют. «Чистая» папка содержит только 8060 файлов.

Я проверил разницу и выяснил, какие файлы отсутствуют. Я обнаружил, что все файлы имеют правильный формат и могут быть обработаны нормально, если я сделаю это с помощью цикла for. Поэтому я подозреваю, что, возможно, я неправильно использовал многопроцессорность.

Вот коды.

 def cleanup(file):
    print('Working on '   file)
    stock = pd.read_csv(raw_folder_path   file)

    try:
        del stock['Unnamed: 0']
        stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
        stock.to_csv(clean_folder_path   file)
    except:
        print(file   ' Not Successfull')
        stock.to_csv(failed_folder_path   file)

#run multiprocessing
files_list = os.listdir(raw_folder_path)
pool = multiprocessing.Pool()
pool.map(cleanup, files_list)
  

Я надеюсь, что кто-нибудь сможет объяснить мне, где я ошибся в этом? Спасибо!

Комментарии:

1. В случае stock.to_csv сбоя он может не записывать ни в чистые, ни в сбойные папки.

2. Кстати, основной процесс должен запускать весь свой код в if __name__ == '__main__': блоке. Так ли это для вас? Потому что все дочерние процессы выполняют тот же сценарий, что и основные, за исключением того, что дочерние __name__ процессы отличаются от __main__ .

3. Также вы можете попытаться эмулировать преднамеренное исключение в одном из дочерних процессов и попытаться увидеть, перехватывает ли основной процесс и сообщает об этом исключении. Возможно, основной процесс не уловил какую-либо ошибку дочернего процесса. Это может объяснить, что запись csv игнорировалась в обеих папках молча. Если main не сообщает об исключениях при использовании map() , вам следует использовать прокси-сервер Manager для обмена исключениями и другой информацией между дочерним и основным процессами.

4. Также правильный способ использования пула находится внутри with оператора, подобного этому with multiprocessing.Pool() as pool: pool.map(cleanup, files_list) . В противном случае пул не закрывается, и в некоторых случаях процессы не завершаются. Также ошибки могут оставаться незамеченными.

5. Также после обеих to_csv(...) строк вы можете добавить утверждение assert os.path.exists(csv_path) , чтобы быть уверенным, что csv-файл был создан, а не пропускал ошибки молча. Тогда assert наверняка будет создано исключение, которое будет перехвачено main в случае, если .map() действительно сообщает обо всех исключениях.

Ответ №1:

Я поместил рекомендации в комментарии к вопросу. Окончательный код, вероятно, должен выглядеть следующим образом:

Попробуйте онлайн здесь!

 import multiprocessing, os, datetime
import pandas as pd

raw_folder_path = './input/'
clean_folder_path = './clean/'
failed_folder_path = './failed/'

def cleanup(file):
    print('Working on '   file)
    stock = pd.read_csv(raw_folder_path   file)

    try:
        del stock['Unnamed: 0']
        stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
        stock.to_csv(clean_folder_path   file)
        assert os.path.exists(clean_folder_path   file)
        return True
    except Exception as ex:
        print(file   ' Not Successfull ', ex)
        stock.to_csv(failed_folder_path   file)
        assert os.path.exists(failed_folder_path   file)
        return False

if __name__ == '__main__':
    files_list = os.listdir(raw_folder_path)
    with multiprocessing.Pool() as pool:
        results = pool.map(cleanup, files_list)
    assert all(results), f'{len(results) - sum(results)} wrong results!'
    assert len(results) == len(files_list)
  

Комментарии:

1. Спасибо @Arty. Ваш код чистый и красивый. Очень полезно для меня. Спасибо за предоставление этого высококачественного ответа!