Как ray работает с переменными, охватывающими область?

#ray

#ray

Вопрос:

Рассмотрим следующий пример:

 import numpy as np
import ray
import time

A = np.array([42] * 4200)   

@ray.remote
def foo1(x):
    x[5]**2

@ray.remote
def foo2(j):
    A[j]**2

ray.init()

#
# some warmup for ray
#

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo1.remote(A))
time_foo1 = time.perf_counter() - start

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo2.remote(5))
time_foo2 = time.perf_counter() - start

print(time_foo1, time_foo2)

ray.shutdown()
  

Похоже, что time_foo2 это значительно меньше, чем time_foo1 . Мое наивное предположение состояло бы в том, что ray сериализуется A при каждом foo1 вызове. Однако, даже если я вручную помещу A в хранилище объектов и передам ссылку на объект foo1 , я не вижу никаких улучшений. Может ли кто-нибудь просветить меня, что происходит здесь за кулисами?

Ответ №1:

Когда я запускаю ваш код, я получаю 0.8745803280000004 0.672677727 . Итак, foo2 меньше, но ненамного (возможно, A было больше в вашем исходном коде?). При этом, вот объяснение того, что делает ray.

Когда функция аннотируется с помощью ray.remote , она сериализуется, чтобы ее можно было отправлять удаленным процессам для запуска. Ray использует cloudpickle для сериализации. Когда функция сериализуется, ее глобальные зависимости также сериализуются.

В следующем примере A приведен пример зависимости от глобальной переменной, которая должна быть сериализована.

 @ray.remote
def foo2(j):
    A[j]**2
  

При вызове удаленной функции Ray должен передать аргументы удаленной функции. Для небольших объектов есть оптимизация, но для больших объектов логика примерно следующая:

 for each arg:
    if arg is an ObjectRef,
        do nothing
    else,
        replace arg with ray.put(arg)
  

На удаленном рабочем компьютере, когда вызывается удаленная функция, мы вызываем ray.get все ссылки на объекты, прежде чем фактически вызывать функцию (опять же, мы фокусируемся только на больших объектах). ray.get может извлечь выгоду из таких оптимизаций, как кэширование или чтение с нулевой копией, поэтому это часто намного дешевле, чем ray.put .

На практике это означает, что следующий код

 @ray.remote
def foo(arg):
    # At this point ray.get(arg_ref) has already happened

A = np.arange(1_000_000)
foo.remote(A) # This is the same as foo.remote(ray.put(A))
foo.remote(A) # Same as foo.remote(ray.put(A)), which means it has happened twice now
  

Тогда как, если мы явно вызываем ray.put , мы можем сэкономить put

 A_ref = np.put(A) 
foo.remote(A_ref) # ray.put is not called here
foo.remote(A_ref) # again, ray.put is not called
  

Когда я запускаю эти примеры с матрицей из 1 миллиона записей для A , я получаю следующие времена (вот мой пример кода):

 Time putting A every time: 3.041259899
Time passing ref of A: 0.7547513060000002
Time serializing A in function: 0.7694220469999999
  

Обратите внимание, что, хотя сериализация A была быстрой, это плохая практика и не рекомендуется. Это связано с тем, что объекты помещаются в хранилище объектов, а сериализованные функции помещаются в хранилище элементов управления, а хранилище элементов управления не предназначено для передачи больших объемов данных.

Комментарии:

1. Большое спасибо за этот исчерпывающий ответ!