#python #pandas #python-multiprocessing #python-multithreading
#python #pandas #python-многопроцессорная обработка #python-многопоточность
Вопрос:
Итак, у меня около 5000 файлов CSV в одном каталоге, который содержит данные о минутах акций. Каждый файл назван по своему символу. как и в случае с AAPL, AAPL называется AAPL.csv.
Я пытаюсь выполнить некоторую очистку и редактирование каждого из них. В этом случае я пытаюсь преобразовать один столбец, который содержит данные эпохи unix, в читаемую дату и время. Я также хочу изменить метку одного столбца.
Я пытаюсь использовать многопроцессорную обработку для ускорения процесса. Но сначала попробуйте просто убить мой Macbook.
Я запускаю его в записной книжке jupyter от VSCode. Если это имеет значение.
Интересно, что я сделал не так и как улучшить. И как обрабатывать аналогичные задачи в python и pandas.
Спасибо!
Вот мой код.
# Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
print('Working on {}'.format(file))
stock = pd.read_csv('./Data/minutes_data/' file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']), axis=1)
stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']), axis=1)
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' file)
except:
print('{} not successful'.format(file))
fail_list = fail_list.append(file)
fail_list.to_csv('./failed_list.csv')
#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')
#prepare failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
p = multiprocessing.Process(target=clean_up, args=(file,fail_list,))
processes.append(p)
p.start()
for process in processes:
process.join()
Обновление: CSV_FILE_SAMPLE
открыть, высокий, низкий, закрыть, объем, дата-время 0,21.9,21.9,21.9,21.9,200,1596722940000 0,20.0,20.0,19.9937,19.9937,200,1595266500000 1,20.0,20.0,19.9937,19.9937,500,1595266800000 2,20.0,20.0,19.9937,19.9937,1094,1595267040000 3,20.0,20.0,20.0,20.0,200,1595268240000
Окончательное обновление:
Объединив ответы от @furas и @jsmart, скрипту удалось сократить время обработки 5000 csv-файлов с часов до менее чем 1 минуты (до 6 core i9 на Macbook pro). Я счастлив. Вы, ребята, потрясающие. Спасибо!
Окончательные сценарии здесь:
import pandas as pd
import numpy as np
import os
import multiprocessing
import logging
logging.basicConfig(filename='./log.log',level=logging.DEBUG)
file_list = os.listdir('./Data/minutes_data/')
def cleanup(file):
print('Working on ' file)
stock = pd.read_csv('./Data/minutes_data/' file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock['Time'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.time
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' file)
except:
print(file ' Not successful')
logging.warning(file ' Not complete.')
pool = multiprocessing.Pool()
pool.map(cleanup, file_list)
Комментарии:
1. вы получили сообщение об ошибке? что это было?
2. использование
multiprocessing.Process
you создает5000
процессы одновременно. Лучше используйте ie.multiprocessing.Pool(10)
для одновременного запуска только10
процессов. И когда какой-либо процесс завершает работу, он использует его со следующим файлом в списке.3. Сначала очень полезно составить профиль — это позволит вам узнать, где происходит замедление. Если он просто ожидает ввода-вывода файла, лучшим вариантом может быть пул потоков.
4. документация для многопроцессорной обработки даже показана
Pool
в первом примере.5. с помощью
Pool
вы также можете использоватьreturn
для отправки файла с ошибкой в основной процесс, а затем он может его сохранить. Сохранение в одном и том же файлеfailed_list.csv
в разных процессах может привести к неправильным результатам. Кроме того, процессы не используют общие переменные, и у каждого процесса будет своя копия emptyfailed_file
, и он сохранит только одно значение и удалит предыдущее значение в файле'./failed_list.csv'
.
Ответ №1:
Используя Process
в цикле, вы создаете 5000 процессов одновременно
Вы могли бы использовать Pool
для управления одновременным количеством процессов — и это автоматически освободит процесс со следующим файлом.
Он также может использовать return
для отправки имени неудачного файла в основной процесс и может сохранять файл один раз. Использование одного и того же файла во многих процессах может привести к неправильным данным в этом файле. Кроме того, процессы не используют общие переменные, и каждый процесс будет иметь собственный пустой фрейм данных, а позже сохранит только собственный неудачный файл — таким образом, он удалит предыдущее содержимое.
def clean_up(file):
# ... code ...
return None # if OK
except:
return file # if failed
# --- main ---
# get file list to working on.
file_list = sorted(os.listdir('./Data/minutes_data/'))
with multiprocessing.Pool(10) as p:
failed_files = p.map(clean_up, file_list)
# remove None from names
failed_files = filter(None, failed_files)
# save all
df = pd.DataFrame(failed_files)
df.to_csv('./failed_list.csv')
Существует также, multiprocessing.pool.ThreadPool
который использует threads
вместо processes
.
Модуль concurrent.futures также ThreadPoolExecutor
имеет и ProcessPoolExecutor
Вы также можете попробовать сделать это с помощью внешних модулей, но я не помню, что может быть полезным.
Комментарии:
1. Спасибо @furas! Я думаю, что я не понял, как работает многопроцессорная обработка в первую очередь. Ваш ответ очень полезен!
2. Я думал также о внешних модулях, таких как
ray
,dask
,joblib
но у меня нет опыта — см. Также 6 библиотек Python для параллельной обработки . В Linux я также могу использовать программу GNU parallel и скрипт, который работает с файлом siple и выполняетls | parallel -n 10 python clean_up.py
Ответ №2:
В исходном сообщении спрашивалось «… как обрабатывать аналогичные задачи в python и pandas».
- Замена
.apply(..., axis=1)
может увеличить пропускную способность в 100 раз или лучше. - Вот пример с 10_000 строками данных:
%%timeit
df['date'] = df.apply(lambda x: pd.to_datetime(x['timestamp'], unit='ms'), axis=1)
792 ms ± 26.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Переписать как:
%%timeit
df['date'] = pd.to_datetime(df['date'], unit='ms')
4.88 ms ± 38.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Пример данных:
print(df['timestamp'].head())
0 1586863008214
1 1286654914895
2 1436424291218
3 1423512988135
4 1413205308057
Name: timestamp, dtype: int64
Комментарии:
1. Извините, но я не до конца понимаю ваше решение. Вы имеете в виду, что Pandas имеет встроенные функции для преобразования формата эпохи миллисекунд в читаемый формат даты и времени?
2. ДА. Аргумент
unit='ms'
указывает, что исходные значения указаны в миллисекундах с момента начала эпохи. И вы можете использовать.dt.date
или.dt.time
, чтобы указать только дату или только время. Документы здесь to_datetime3. Ты сделал мой день лучше. Спасибо!