#ray
#ray
Вопрос:
Рассмотрим следующий пример:
import numpy as np
import ray
import time
A = np.array([42] * 4200)
@ray.remote
def foo1(x):
x[5]**2
@ray.remote
def foo2(j):
A[j]**2
ray.init()
#
# some warmup for ray
#
start = time.perf_counter()
for _ in range(1000):
ray.get(foo1.remote(A))
time_foo1 = time.perf_counter() - start
start = time.perf_counter()
for _ in range(1000):
ray.get(foo2.remote(5))
time_foo2 = time.perf_counter() - start
print(time_foo1, time_foo2)
ray.shutdown()
Похоже, что time_foo2
это значительно меньше, чем time_foo1
. Мое наивное предположение состояло бы в том, что ray сериализуется A
при каждом foo1
вызове. Однако, даже если я вручную помещу A
в хранилище объектов и передам ссылку на объект foo1
, я не вижу никаких улучшений. Может ли кто-нибудь просветить меня, что происходит здесь за кулисами?
Ответ №1:
Когда я запускаю ваш код, я получаю 0.8745803280000004 0.672677727
. Итак, foo2 меньше, но ненамного (возможно, A
было больше в вашем исходном коде?). При этом, вот объяснение того, что делает ray.
Когда функция аннотируется с помощью ray.remote
, она сериализуется, чтобы ее можно было отправлять удаленным процессам для запуска. Ray использует cloudpickle для сериализации. Когда функция сериализуется, ее глобальные зависимости также сериализуются.
В следующем примере A
приведен пример зависимости от глобальной переменной, которая должна быть сериализована.
@ray.remote
def foo2(j):
A[j]**2
При вызове удаленной функции Ray должен передать аргументы удаленной функции. Для небольших объектов есть оптимизация, но для больших объектов логика примерно следующая:
for each arg:
if arg is an ObjectRef,
do nothing
else,
replace arg with ray.put(arg)
На удаленном рабочем компьютере, когда вызывается удаленная функция, мы вызываем ray.get
все ссылки на объекты, прежде чем фактически вызывать функцию (опять же, мы фокусируемся только на больших объектах). ray.get
может извлечь выгоду из таких оптимизаций, как кэширование или чтение с нулевой копией, поэтому это часто намного дешевле, чем ray.put
.
На практике это означает, что следующий код
@ray.remote
def foo(arg):
# At this point ray.get(arg_ref) has already happened
A = np.arange(1_000_000)
foo.remote(A) # This is the same as foo.remote(ray.put(A))
foo.remote(A) # Same as foo.remote(ray.put(A)), which means it has happened twice now
Тогда как, если мы явно вызываем ray.put
, мы можем сэкономить put
A_ref = np.put(A)
foo.remote(A_ref) # ray.put is not called here
foo.remote(A_ref) # again, ray.put is not called
Когда я запускаю эти примеры с матрицей из 1 миллиона записей для A
, я получаю следующие времена (вот мой пример кода):
Time putting A every time: 3.041259899
Time passing ref of A: 0.7547513060000002
Time serializing A in function: 0.7694220469999999
Обратите внимание, что, хотя сериализация A была быстрой, это плохая практика и не рекомендуется. Это связано с тем, что объекты помещаются в хранилище объектов, а сериализованные функции помещаются в хранилище элементов управления, а хранилище элементов управления не предназначено для передачи больших объемов данных.
Комментарии:
1. Большое спасибо за этот исчерпывающий ответ!