Использование многопроцессорной обработки с Pandas для чтения, изменения и записи тысяч CSV-файлов

#python #pandas #python-multiprocessing #python-multithreading

#python #pandas #python-многопроцессорная обработка #python-многопоточность

Вопрос:

Итак, у меня около 5000 файлов CSV в одном каталоге, который содержит данные о минутах акций. Каждый файл назван по своему символу. как и в случае с AAPL, AAPL называется AAPL.csv.

Я пытаюсь выполнить некоторую очистку и редактирование каждого из них. В этом случае я пытаюсь преобразовать один столбец, который содержит данные эпохи unix, в читаемую дату и время. Я также хочу изменить метку одного столбца.

Я пытаюсь использовать многопроцессорную обработку для ускорения процесса. Но сначала попробуйте просто убить мой Macbook.

Я запускаю его в записной книжке jupyter от VSCode. Если это имеет значение.

Интересно, что я сделал не так и как улучшить. И как обрабатывать аналогичные задачи в python и pandas.

Спасибо!

Вот мой код.

 # Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
    print('Working on {}'.format(file))
    stock = pd.read_csv('./Data/minutes_data/'   file)

    try:
        #Convert datetime columns into readable date and time column
        stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']), axis=1)
        stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']), axis=1)

        #Rename 'Unnamed: 0' column into 'Minute'
        stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)

        #Write it back to new file
        stock.to_csv('./Data/working_data/'   file)
    except:
        print('{} not successful'.format(file))
        fail_list = fail_list.append(file)
        fail_list.to_csv('./failed_list.csv')



#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')

#prepare failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
    p = multiprocessing.Process(target=clean_up, args=(file,fail_list,))
    processes.append(p)
    p.start()

for process in processes:
    process.join()
  

Обновление: CSV_FILE_SAMPLE

открыть, высокий, низкий, закрыть, объем, дата-время 0,21.9,21.9,21.9,21.9,200,1596722940000 0,20.0,20.0,19.9937,19.9937,200,1595266500000 1,20.0,20.0,19.9937,19.9937,500,1595266800000 2,20.0,20.0,19.9937,19.9937,1094,1595267040000 3,20.0,20.0,20.0,20.0,200,1595268240000

Окончательное обновление:

Объединив ответы от @furas и @jsmart, скрипту удалось сократить время обработки 5000 csv-файлов с часов до менее чем 1 минуты (до 6 core i9 на Macbook pro). Я счастлив. Вы, ребята, потрясающие. Спасибо!

Окончательные сценарии здесь:

 import pandas as pd
import numpy as np
import os
import multiprocessing
import logging

logging.basicConfig(filename='./log.log',level=logging.DEBUG)

file_list = os.listdir('./Data/minutes_data/')

def cleanup(file):
    print('Working on '   file)
    stock = pd.read_csv('./Data/minutes_data/'   file)
    
    try:
        #Convert datetime columns into readable date and time column
        stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
        stock['Time'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.time

        #Rename 'Unnamed: 0' column into 'Minute'
        stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)

        #Write it back to new file
        stock.to_csv('./Data/working_data/'   file)
    except:
        print(file   ' Not successful')
        logging.warning(file   ' Not complete.')



pool = multiprocessing.Pool()
pool.map(cleanup, file_list)
  

Комментарии:

1. вы получили сообщение об ошибке? что это было?

2. использование multiprocessing.Process you создает 5000 процессы одновременно. Лучше используйте ie. multiprocessing.Pool(10) для одновременного запуска только 10 процессов. И когда какой-либо процесс завершает работу, он использует его со следующим файлом в списке.

3. Сначала очень полезно составить профиль — это позволит вам узнать, где происходит замедление. Если он просто ожидает ввода-вывода файла, лучшим вариантом может быть пул потоков.

4. документация для многопроцессорной обработки даже показана Pool в первом примере.

5. с помощью Pool вы также можете использовать return для отправки файла с ошибкой в основной процесс, а затем он может его сохранить. Сохранение в одном и том же файле failed_list.csv в разных процессах может привести к неправильным результатам. Кроме того, процессы не используют общие переменные, и у каждого процесса будет своя копия empty failed_file , и он сохранит только одно значение и удалит предыдущее значение в файле './failed_list.csv' .

Ответ №1:

Используя Process в цикле, вы создаете 5000 процессов одновременно

Вы могли бы использовать Pool для управления одновременным количеством процессов — и это автоматически освободит процесс со следующим файлом.

Он также может использовать return для отправки имени неудачного файла в основной процесс и может сохранять файл один раз. Использование одного и того же файла во многих процессах может привести к неправильным данным в этом файле. Кроме того, процессы не используют общие переменные, и каждый процесс будет иметь собственный пустой фрейм данных, а позже сохранит только собственный неудачный файл — таким образом, он удалит предыдущее содержимое.

 def clean_up(file):
    # ... code ...
    
        return None  # if OK
    except:
        return file  # if failed
    
    
# --- main ---

# get file list to working on.
file_list = sorted(os.listdir('./Data/minutes_data/'))

with multiprocessing.Pool(10) as p:
    failed_files = p.map(clean_up, file_list)

# remove None from names
failed_files = filter(None, failed_files)

# save all
df = pd.DataFrame(failed_files)
df.to_csv('./failed_list.csv')
  

Существует также, multiprocessing.pool.ThreadPool который использует threads вместо processes .

Модуль concurrent.futures также ThreadPoolExecutor имеет и ProcessPoolExecutor

Вы также можете попробовать сделать это с помощью внешних модулей, но я не помню, что может быть полезным.

Комментарии:

1. Спасибо @furas! Я думаю, что я не понял, как работает многопроцессорная обработка в первую очередь. Ваш ответ очень полезен!

2. Я думал также о внешних модулях, таких как ray , dask , joblib но у меня нет опыта — см. Также 6 библиотек Python для параллельной обработки . В Linux я также могу использовать программу GNU parallel и скрипт, который работает с файлом siple и выполняет ls | parallel -n 10 python clean_up.py

Ответ №2:

В исходном сообщении спрашивалось «… как обрабатывать аналогичные задачи в python и pandas».

  • Замена .apply(..., axis=1) может увеличить пропускную способность в 100 раз или лучше.
  • Вот пример с 10_000 строками данных:
 %%timeit
df['date'] = df.apply(lambda x: pd.to_datetime(x['timestamp'], unit='ms'), axis=1)
792 ms ± 26.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
  

Переписать как:

 %%timeit
df['date'] = pd.to_datetime(df['date'], unit='ms')
4.88 ms ± 38.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
  

Пример данных:

 print(df['timestamp'].head())
0    1586863008214
1    1286654914895
2    1436424291218
3    1423512988135
4    1413205308057
Name: timestamp, dtype: int64
  

Комментарии:

1. Извините, но я не до конца понимаю ваше решение. Вы имеете в виду, что Pandas имеет встроенные функции для преобразования формата эпохи миллисекунд в читаемый формат даты и времени?

2. ДА. Аргумент unit='ms' указывает, что исходные значения указаны в миллисекундах с момента начала эпохи. И вы можете использовать .dt.date или .dt.time , чтобы указать только дату или только время. Документы здесь to_datetime

3. Ты сделал мой день лучше. Спасибо!