#python #python-3.x #pandas #multiprocessing #python-multiprocessing
#python #python-3.x #pandas #многопроцессорная обработка #python-многопроцессорная обработка
Вопрос:
У меня есть набор истории цен на акции, содержит 8160 файлов в папке «Raw». Я использую pandas для внесения некоторых изменений в каждый из них, чем выводить новый файл в другую папку, «очищенную». Любой файл, который не удалось обработать, следует записать в папку с именем «Сбой».
Все файлы представляют собой csv.
После того, как я запустил приведенный ниже код, я обнаружил, что в разделе сбой нет файлов. Но около 100 файлов отсутствуют. «Чистая» папка содержит только 8060 файлов.
Я проверил разницу и выяснил, какие файлы отсутствуют. Я обнаружил, что все файлы имеют правильный формат и могут быть обработаны нормально, если я сделаю это с помощью цикла for. Поэтому я подозреваю, что, возможно, я неправильно использовал многопроцессорность.
Вот коды.
def cleanup(file):
print('Working on ' file)
stock = pd.read_csv(raw_folder_path file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path file)
except:
print(file ' Not Successfull')
stock.to_csv(failed_folder_path file)
#run multiprocessing
files_list = os.listdir(raw_folder_path)
pool = multiprocessing.Pool()
pool.map(cleanup, files_list)
Я надеюсь, что кто-нибудь сможет объяснить мне, где я ошибся в этом? Спасибо!
Комментарии:
1. В случае
stock.to_csv
сбоя он может не записывать ни в чистые, ни в сбойные папки.2. Кстати, основной процесс должен запускать весь свой код в
if __name__ == '__main__':
блоке. Так ли это для вас? Потому что все дочерние процессы выполняют тот же сценарий, что и основные, за исключением того, что дочерние__name__
процессы отличаются от__main__
.3. Также вы можете попытаться эмулировать преднамеренное исключение в одном из дочерних процессов и попытаться увидеть, перехватывает ли основной процесс и сообщает об этом исключении. Возможно, основной процесс не уловил какую-либо ошибку дочернего процесса. Это может объяснить, что запись csv игнорировалась в обеих папках молча. Если main не сообщает об исключениях при использовании
map()
, вам следует использовать прокси-сервер Manager для обмена исключениями и другой информацией между дочерним и основным процессами.4. Также правильный способ использования пула находится внутри
with
оператора, подобного этомуwith multiprocessing.Pool() as pool: pool.map(cleanup, files_list)
. В противном случае пул не закрывается, и в некоторых случаях процессы не завершаются. Также ошибки могут оставаться незамеченными.5. Также после обеих
to_csv(...)
строк вы можете добавить утверждениеassert os.path.exists(csv_path)
, чтобы быть уверенным, что csv-файл был создан, а не пропускал ошибки молча. Тогдаassert
наверняка будет создано исключение, которое будет перехвачено main в случае, если.map()
действительно сообщает обо всех исключениях.
Ответ №1:
Я поместил рекомендации в комментарии к вопросу. Окончательный код, вероятно, должен выглядеть следующим образом:
import multiprocessing, os, datetime
import pandas as pd
raw_folder_path = './input/'
clean_folder_path = './clean/'
failed_folder_path = './failed/'
def cleanup(file):
print('Working on ' file)
stock = pd.read_csv(raw_folder_path file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path file)
assert os.path.exists(clean_folder_path file)
return True
except Exception as ex:
print(file ' Not Successfull ', ex)
stock.to_csv(failed_folder_path file)
assert os.path.exists(failed_folder_path file)
return False
if __name__ == '__main__':
files_list = os.listdir(raw_folder_path)
with multiprocessing.Pool() as pool:
results = pool.map(cleanup, files_list)
assert all(results), f'{len(results) - sum(results)} wrong results!'
assert len(results) == len(files_list)
Комментарии:
1. Спасибо @Arty. Ваш код чистый и красивый. Очень полезно для меня. Спасибо за предоставление этого высококачественного ответа!