#python #multiprocessing #concurrent.futures
Вопрос:
На случай, если кто-то еще увидит ту же проблему:
## The params should have been sent as a tuple
data = []
for file in files:
data.append((file, loc, symbol, under_df))
with concurrent.futures.ProcessPoolExecutor() as executor:
r = [executor.submit(process_file, data) for data in data]
Я хочу использовать concurrent.futures
его, чтобы сократить время обработки для множества файлов. Я очень новичок в concurrent.futures
этом деле .
Вот код:
def process_file(file, loc, symbol, under_df):
print(file)
# Load the data
fut_df = pd.read_csv('{}{}fut/{}'.format(loc, symbol, file), index_col=0)
# drop duplicates
fut_df.drop_duplicates(inplace=True)
# Create expiry column
fut_df['expiry'] = fut_df['contractname'].str.replace(symbol.upper(), '')
fut_df['expiry'] = fut_df['expiry'].str.replace('FUT', '')
fut_df['expiry'] = pd.to_datetime(fut_df['expiry'], format='%y%b')
# Convert timestamp
fut_df['timestamp'] = pd.to_datetime(fut_df['timestamp'])
# get all the timestamps
times = fut_df['timestamp'].tolist()
# There is a bunch of code after this
with concurrent.futures.ProcessPoolExecutor() as executor:
r = [executor.submit(process_file, [file, loc, symbol, under_df]) for file in files[0:2]]
[<Будущее в состоянии 0x7fd65a5b1860=ошибка поднятого типа завершена>, <Будущее в состоянии 0x7fd65a5b1860=ошибка поднятого типа завершена><Будущее в состоянии 0x7fd65a5dc6d8=ошибка поднятого типа завершена>]
Я попытался изменить код, чтобы проверить проблему:
def process_file(file):
print(file)
with concurrent.futures.ProcessPoolExecutor() as executor:
r = [executor.submit(process_file, file) for file in files[0:2]]
Я все еще получаю ту же ошибку.
В чем здесь может быть проблема? process_file
Функция работает без проблем.
Комментарии:
1. Можете ли вы поймать exc и распечатать что-нибудь в своем дочернем процессе?
Ответ №1:
Ты не появился files
, и я думаю file
, что это может быть что-то другое, чем ты ожидаешь.
from concurrent.futures.process import ProcessPoolExecutor
def print_int(el: int):
print(el**2)
lst = list(range(10))
with ProcessPoolExecutor() as executor:
r = [executor.submit(print_int, el) for el in lst]
Это прекрасно работает и использует ваш пример кода без files
.
Комментарии:
1.
file
это строковое имя файла.wipro_1.csv
.2. правильно ли я передаю несколько аргументов?
3. В последнем примере вы передаете только один аргумент, так что это должно быть хорошо.
4. Обнаружена проблема, это данные должны быть отправлены в виде
tuple
и столько кортежей, сколько файлов. Я опубликую код.