Есть ли какой-либо способ запустить «прогнозирование» предварительно загруженной модели Keras параллельно?

#python #tensorflow #keras #parallel-processing #joblib

Вопрос:

У меня есть класс, в котором я создаю модель Keras для выполнения предсказаний. Этот класс организован примерно так:

 class MyClass():
    def __init__(self):
        
        self.model = None

    def load(path):
        
        self.model = tf.keras.models.load_model(path_)

    def inference(data):

        #...
        pred = self.model.predict(data)
        #...
        
        return pred
 

Я пытался запустить этот MyClass.inference метод параллельно. Я попробовал это с joblib.Parallel :

 from joblib import Parallel, delayed

n_jobs = 8

myobj = MyClass()
myobj.load(<Path_to_model>)

results = Parallel(n_jobs=n_jobs )(delayed(myobj.inference)(d) for d in mydata))
 

Но я получаю следующую ошибку: TypeError: cannot pickle 'weakref' object

По-видимому, это известная проблема с Керасом (https://github.com/tensorflow/tensorflow/issues/34697), это должно было быть исправлено в TF 2.6.0. Но после обновления tensorflow до версии 2.6.0 я все равно получаю ту же ошибку. Я даже пробовал tf-каждый вечер, как предлагалось в том же выпуске, но и это не сработало.

Я также попытался заменить pickle на dill , на import dill as pickle , но это не исправило этого.

Единственное, что действительно сработало, — это замена loky бэкенда на Parallel threading . Однако в одном сценарии, который я попытался использовать threading , в конечном итоге заняло почти столько же времени (или немного медленнее), MyClass.inference что и последовательное выполнение вызовов.

Мой вопрос таков: каковы мои варианты здесь? Есть ли какой-либо способ параллельного запуска предварительно загруженной модели keras predict , например, с другими библиотеками python?

Комментарии:

1. В Python многопроцессорная обработка является по своей природе непостоянна :/ что часто работает в качестве обходного пути против странные мариновать вопросы-это вещь, которую вы хотите запустить параллельно автономная функции верхнего уровня, который «самодостаточен», потому что пытается работать параллельно предполагает маринование в соответствующем контексте (в mymodel класс в вашем случае).

2. Я не понимаю вопроса. TF/Keras по умолчанию использует распараллеливание. Дайте ему некоторые данные, и он выполнит вывод на графическом процессоре или использует процессор с многопоточностью. Вам не следует пытаться выполнять распараллеливание самостоятельно. Почему это не myobj.inference(data) работает на вас?

3. Вы правы в том, что Keras по умолчанию использует распараллеливание. Проблема в том, как организован мой конвейер, поскольку, когда я выполняю вызовы Kera, у меня нет единого представления данных (я использую Keras для извлечения некоторых специальных функций из моих данных). Так вот почему на данный момент мне нужно выполнять вызовы самостоятельно параллельно

Ответ №1:

Я смог воспроизвести это поведение tensorflow==2.2.0 с помощью keras==2.4.3 и.

joblib.Parallel полагайтесь на pickle совместное использование определения функции в нескольких процессах python, но pickle реализация имеет несколько ограничений (doc).

Я не экспериментировал с преимуществами использования этого решения для последовательных запусков, учитывая, что вы не указали, ищете ли вы «многопроцессорное» или «многопоточное» решение. Но использование concurrent.futures (doc) — это способ решения этой проблемы.

Модуль concurrent.futures предоставляет интерфейс высокого уровня для асинхронного выполнения вызываемых объектов.

Асинхронное выполнение может выполняться потоками с использованием ThreadPoolExecutor или отдельными процессами с использованием ProcessPoolExecutor. Оба реализуют один и тот же интерфейс, который определяется абстрактным классом исполнителя.

Это код, который я использовал для параллельного прогнозирования моей модели.

 from tornado import concurrent

futures = []
# max_workers: The maximum number of threads that can be used to execute the given calls.
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    for d in mydata:
        # Schedules the callable "inference()" to be executed 
        # and returns a Future instance representing the execution of the "inference()".
        future = executor.submit(myobj.inference, d)
        futures.append(future)

# Loop over futures to wait for them in the order they were submitted:
for future in futures:
    result = future.result()
    print(result)
 

Чтобы получить каждый результат прогнозирования, как только он будет готов, даже если они выйдут из строя, вы можете использовать as_completed :

 for future in concurrent.futures.as_completed(futures):
    print(future.result())
 

Вы можете провести некоторые тесты со своими данными, чтобы увидеть, действительно ли использование многопоточности может ускорить извлечение, учитывая, что это задача, связанная с процессором.

Вот некоторые из лучших советов и приложений по параллельной обработке python от fastai сообщества: ссылка