параллельные.фьючерсы вызывают ошибку типа?

#python #multiprocessing #concurrent.futures

Вопрос:

На случай, если кто-то еще увидит ту же проблему:

 ## The params should have been sent as a tuple
data = []
for file in files:
    data.append((file, loc, symbol, under_df))
with concurrent.futures.ProcessPoolExecutor() as executor:
    r = [executor.submit(process_file, data) for data in data]
 

Я хочу использовать concurrent.futures его, чтобы сократить время обработки для множества файлов. Я очень новичок в concurrent.futures этом деле .

Вот код:

 def process_file(file, loc, symbol, under_df):
        print(file)
        # Load the data
        fut_df = pd.read_csv('{}{}fut/{}'.format(loc, symbol, file), index_col=0)
        # drop duplicates
        fut_df.drop_duplicates(inplace=True)
        # Create expiry column
        fut_df['expiry'] = fut_df['contractname'].str.replace(symbol.upper(), '')
        fut_df['expiry'] = fut_df['expiry'].str.replace('FUT', '')
        fut_df['expiry'] = pd.to_datetime(fut_df['expiry'], format='%y%b')
        # Convert timestamp
        fut_df['timestamp'] = pd.to_datetime(fut_df['timestamp'])
        # get all the timestamps
        times = fut_df['timestamp'].tolist()
        # There is a bunch of code after this

with concurrent.futures.ProcessPoolExecutor() as executor:
    r = [executor.submit(process_file, [file, loc, symbol, under_df]) for file in files[0:2]]
 

[<Будущее в состоянии 0x7fd65a5b1860=ошибка поднятого типа завершена>, <Будущее в состоянии 0x7fd65a5b1860=ошибка поднятого типа завершена><Будущее в состоянии 0x7fd65a5dc6d8=ошибка поднятого типа завершена>]

Я попытался изменить код, чтобы проверить проблему:

 def process_file(file):
        print(file)

with concurrent.futures.ProcessPoolExecutor() as executor:
    r = [executor.submit(process_file, file) for file in files[0:2]]
 

Я все еще получаю ту же ошибку.

В чем здесь может быть проблема? process_file Функция работает без проблем.

Комментарии:

1. Можете ли вы поймать exc и распечатать что-нибудь в своем дочернем процессе?

Ответ №1:

Ты не появился files , и я думаю file , что это может быть что-то другое, чем ты ожидаешь.

 from concurrent.futures.process import ProcessPoolExecutor


def print_int(el: int):
    print(el**2)

lst = list(range(10))

with ProcessPoolExecutor() as executor:
    r = [executor.submit(print_int, el) for el in lst]
 

Это прекрасно работает и использует ваш пример кода без files .

Комментарии:

1. file это строковое имя файла. wipro_1.csv .

2. правильно ли я передаю несколько аргументов?

3. В последнем примере вы передаете только один аргумент, так что это должно быть хорошо.

4. Обнаружена проблема, это данные должны быть отправлены в виде tuple и столько кортежей, сколько файлов. Я опубликую код.