Как распараллелить классификацию с классификацией с нулевым выстрелом с помощью Huggingface?

#python-3.x #redis #classification #huggingface-transformers #ray

#python-3.x #redis #классификация #huggingface-трансформеры #ray

Вопрос:

У меня около 70 категорий (также может быть 20 или 30), и я хочу иметь возможность распараллелить процесс с помощью ray, но я получаю сообщение об ошибке:

 import pandas as pd
import swifter
import json
import ray
from transformers import pipeline

classifier = pipeline("zero-shot-classification")

labels = ["vegetables", "potato", "bell pepper", "tomato", "onion", "carrot", "broccoli",
          "lettuce", "cucumber", "celery", "corn", "garlic", "mashrooms", "cabbage", "spinach",
          "beans", "cauliflower", "asparagus", "fruits", "bananas", "apples", "strawberries",
          "grapes", "oranges", "lemons", "avocados", "peaches", "blueberries", "pineapple",
          "cherries", "pears", "mangoe", "berries", "red meat", "beef", "pork", "mutton",
          "veal", "lamb", "venison", "goat", "mince", "white meat", "chicken", "turkey",
          "duck", "goose", "pheasant", "rabbit", "Processed meat", "sausages", "bacon",
          "ham", "hot dogs", "frankfurters", "tinned meat", "salami", "pâtés", "beef jerky",
          "chorizo", "pepperoni", "corned beef", "fish", "catfish", "cod", "pangasius", "pollock",
          "tilapia", "tuna", "salmon", "seafood", "shrimp", "squid", "mussels", "scallop",
          "octopus", "grains", "rice", "wheat", "bulgur", "corn", "oat", "quinoa", "buckwheat",
          "meals", "salad", "soup", "steak", "pizza", "pie", "burger", "backery", "bread", "souce",
          "pasta", "sandwich", "waffles", "barbecue", "roll", "wings", "ribs", "cookies"]


ray.init()
@ray.remote
def get_meal_category(seq, labels, n=3):
    res_dict = classifier(seq, labels)
    return list(zip([seq for i in range(n)], res_dict["labels"][0:n], res_dict["scores"][0:n]))

res_list = ray.get([get_meal_category.remote(merged_df["title"][i], labels) for i in range(10)])
 

Где merged_df — это большой фрейм данных с названиями блюд в столбце labels, например:

 ['Cappuccino',
 'Stove Top Stuffing Mix For Turkey (Kraft)',
 'Stove Top Stuffing Mix For Turkey (Kraft)',
 'Roasted Dark Turkey Meat',
 'Roasted Dark Turkey Meat',
 'Roasted Dark Turkey Meat',
 'Cappuccino',
 'Low Fat 2% Small Curd Cottage Cheese (Daisy)',
 'Rice Cereal (Gerber)',
 'Oranges']
 

Пожалуйста, посоветуйте, как избежать ошибки Рэя и распараллелить классификацию.

Ошибка:

 2021-02-17 16:54:51,689 WARNING worker.py:1107 -- Warning: The remote function __main__.get_meal_category has size 1630925709 when pickled. It will be stored in Redis, which could cause memory issues. This may mean that its definition uses a large array or other object.
---------------------------------------------------------------------------
ConnectionResetError                      Traceback (most recent call last)
~/.local/lib/python3.8/site-packages/redis/connection.py in send_packed_command(self, command, check_health)
    705             for item in command:
--> 706                 sendall(self._sock, item)
    707         except socket.timeout:

~/.local/lib/python3.8/site-packages/redis/_compat.py in sendall(sock, *args, **kwargs)
      8 def sendall(sock, *args, **kwargs):
----> 9     return sock.sendall(*args, **kwargs)
     10 

ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

ConnectionError                           Traceback (most recent call last)
<ipython-input-9-1a5345832fba> in <module>
----> 1 res_list = ray.get([get_meal_category.remote(merged_df["title"][i], labels) for i in range(10)])

<ipython-input-9-1a5345832fba> in <listcomp>(.0)
----> 1 res_list = ray.get([get_meal_category.remote(merged_df["title"][i], labels) for i in range(10)])

~/.local/lib/python3.8/site-packages/ray/remote_function.py in _remote_proxy(*args, **kwargs)
     99         @wraps(function)
    100         def _remote_proxy(*args, **kwargs):
--> 101             return self._remote(args=args, kwargs=kwargs)
    102 
    103         self.remote = _remote_proxy

~/.local/lib/python3.8/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, override_environment_variables, name)
    205 
    206             self._last_export_session_and_job = worker.current_session_and_job
--> 207             worker.function_actor_manager.export(self)
    208 
    209         kwargs = {} if kwargs is None else kwargs

~/.local/lib/python3.8/site-packages/ray/function_manager.py in export(self, remote_function)
    142         key = (b"RemoteFunction:"   self._worker.current_job_id.binary()   b":"
    143                  remote_function._function_descriptor.function_id.binary())
--> 144         self._worker.redis_client.hset(
    145             key,
    146             mapping={

~/.local/lib/python3.8/site-packages/redis/client.py in hset(self, name, key, value, mapping)
   3048                 items.extend(pair)
   3049 
-> 3050         return self.execute_command('HSET', name, *items)
   3051 
   3052     def hsetnx(self, name, key, value):

~/.local/lib/python3.8/site-packages/redis/client.py in execute_command(self, *args, **options)
    898         conn = self.connection or pool.get_connection(command_name, **options)
    899         try:
--> 900             conn.send_command(*args)
    901             return self.parse_response(conn, command_name, **options)
    902         except (ConnectionError, TimeoutError) as e:

~/.local/lib/python3.8/site-packages/redis/connection.py in send_command(self, *args, **kwargs)
    723     def send_command(self, *args, **kwargs):
    724         "Pack and send a command to the Redis server"
--> 725         self.send_packed_command(self.pack_command(*args),
    726                                  check_health=kwargs.get('check_health', True))
    727 

~/.local/lib/python3.8/site-packages/redis/connection.py in send_packed_command(self, command, check_health)
    715                 errno = e.args[0]
    716                 errmsg = e.args[1]
--> 717             raise ConnectionError("Error %s while writing to socket. %s." %
    718                                   (errno, errmsg))
    719         except BaseException:

ConnectionError: Error 104 while writing to socket. Connection reset by peer.
 

Комментарии:

1. Какую версию Ray вы используете? Можете ли вы попробовать использовать как последнюю версию (1.2), так и ночные колеса ( docs.ray.io/en/master /… ) и посмотрите, получаете ли вы ту же ошибку.

2. @AmogKamsetty Я использую версию ray 1.2.0. Сейчас я проверю насчет ночных колес.

3. @AmogKamsetty размер классификатора составляет 1,6 ГБ, поэтому он очень тяжелый, и если бы я мог использовать ссылку на модель, это могло бы решить проблему. Пожалуйста, посоветуйте

Ответ №1:

Эта ошибка возникает из-за отправки больших объектов в redis. merged_df это большой фрейм данных, и поскольку вы вызываете get_meal_category 10 раз, Ray попытается выполнить сериализацию merged_df 10 раз. Вместо этого, если вы помещаете merged_df в хранилище объектов Ray только один раз, а затем передаете ссылку на объект, это должно сработать.

РЕДАКТИРОВАТЬ: поскольку классификатор также большой, сделайте что-то подобное и для этого.

Можете ли вы попробовать что-то подобное:

 ray.init()
df_ref = ray.put(merged_df)
model_ref = ray.put(classifier)

@ray.remote
def get_meal_category(classifier, df, i, labels, n=3):
    seq = df["title"][i]
    res_dict = classifier(seq, labels)
    return list(zip([seq for i in range(n)], res_dict["labels"][0:n], res_dict["scores"][0:n]))

res_list = ray.get([get_meal_category.remote(model_ref, df_ref, i, labels) for i in range(10)])
 

Комментарии:

1. Могу ли я сделать то же самое с функцией? Ошибка говорит, что это связано с функцией.

2. К сожалению, это не решило проблему, это функция, которая вызывает эту ошибку. Попробовал ваше решение, и оно не сработало.

3. Я изменил ответ, чтобы сделать то же самое для классификатора. Можете ли вы попробовать это тоже?

4. @SteveS это сбой с той же ошибкой? У вас есть воспроизводимый пример, который я могу запустить?

5. Просто возьмите столбец с 2000 названиями блюд отсюда: foodb.ca/downloads