#python #tensorflow #keras #parallel-processing #joblib
Вопрос:
У меня есть класс, в котором я создаю модель Keras для выполнения предсказаний. Этот класс организован примерно так:
class MyClass():
def __init__(self):
self.model = None
def load(path):
self.model = tf.keras.models.load_model(path_)
def inference(data):
#...
pred = self.model.predict(data)
#...
return pred
Я пытался запустить этот MyClass.inference
метод параллельно. Я попробовал это с joblib.Parallel
:
from joblib import Parallel, delayed
n_jobs = 8
myobj = MyClass()
myobj.load(<Path_to_model>)
results = Parallel(n_jobs=n_jobs )(delayed(myobj.inference)(d) for d in mydata))
Но я получаю следующую ошибку: TypeError: cannot pickle 'weakref' object
По-видимому, это известная проблема с Керасом (https://github.com/tensorflow/tensorflow/issues/34697), это должно было быть исправлено в TF 2.6.0. Но после обновления tensorflow до версии 2.6.0 я все равно получаю ту же ошибку. Я даже пробовал tf-каждый вечер, как предлагалось в том же выпуске, но и это не сработало.
Я также попытался заменить pickle
на dill
, на import dill as pickle
, но это не исправило этого.
Единственное, что действительно сработало, — это замена loky
бэкенда на Parallel
threading
. Однако в одном сценарии, который я попытался использовать threading
, в конечном итоге заняло почти столько же времени (или немного медленнее), MyClass.inference
что и последовательное выполнение вызовов.
Мой вопрос таков: каковы мои варианты здесь? Есть ли какой-либо способ параллельного запуска предварительно загруженной модели keras predict
, например, с другими библиотеками python?
Комментарии:
1. В Python многопроцессорная обработка является по своей природе непостоянна :/ что часто работает в качестве обходного пути против странные мариновать вопросы-это вещь, которую вы хотите запустить параллельно автономная функции верхнего уровня, который «самодостаточен», потому что пытается работать параллельно предполагает маринование в соответствующем контексте (в
mymodel
класс в вашем случае).2. Я не понимаю вопроса. TF/Keras по умолчанию использует распараллеливание. Дайте ему некоторые данные, и он выполнит вывод на графическом процессоре или использует процессор с многопоточностью. Вам не следует пытаться выполнять распараллеливание самостоятельно. Почему это не
myobj.inference(data)
работает на вас?3. Вы правы в том, что Keras по умолчанию использует распараллеливание. Проблема в том, как организован мой конвейер, поскольку, когда я выполняю вызовы Kera, у меня нет единого представления данных (я использую Keras для извлечения некоторых специальных функций из моих данных). Так вот почему на данный момент мне нужно выполнять вызовы самостоятельно параллельно
Ответ №1:
Я смог воспроизвести это поведение tensorflow==2.2.0
с помощью keras==2.4.3
и.
joblib.Parallel
полагайтесь на pickle
совместное использование определения функции в нескольких процессах python, но pickle
реализация имеет несколько ограничений (doc).
Я не экспериментировал с преимуществами использования этого решения для последовательных запусков, учитывая, что вы не указали, ищете ли вы «многопроцессорное» или «многопоточное» решение. Но использование concurrent.futures
(doc) — это способ решения этой проблемы.
Модуль concurrent.futures предоставляет интерфейс высокого уровня для асинхронного выполнения вызываемых объектов.
Асинхронное выполнение может выполняться потоками с использованием ThreadPoolExecutor или отдельными процессами с использованием ProcessPoolExecutor. Оба реализуют один и тот же интерфейс, который определяется абстрактным классом исполнителя.
Это код, который я использовал для параллельного прогнозирования моей модели.
from tornado import concurrent
futures = []
# max_workers: The maximum number of threads that can be used to execute the given calls.
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
for d in mydata:
# Schedules the callable "inference()" to be executed
# and returns a Future instance representing the execution of the "inference()".
future = executor.submit(myobj.inference, d)
futures.append(future)
# Loop over futures to wait for them in the order they were submitted:
for future in futures:
result = future.result()
print(result)
Чтобы получить каждый результат прогнозирования, как только он будет готов, даже если они выйдут из строя, вы можете использовать as_completed
:
for future in concurrent.futures.as_completed(futures):
print(future.result())
Вы можете провести некоторые тесты со своими данными, чтобы увидеть, действительно ли использование многопоточности может ускорить извлечение, учитывая, что это задача, связанная с процессором.
Вот некоторые из лучших советов и приложений по параллельной обработке python от fastai
сообщества: ссылка