#python #distributed-computing #dask
#python #распределенные вычисления #dask
Вопрос:
Я хочу добавить обратный вызов в будущее, как только он будет завершен.
Согласно документации:
Вызовите обратный вызов в будущем, когда обратный вызов завершится.
Функция обратного вызова fn должна принимать future в качестве своего единственного аргумента. Это будет вызвано независимо от того, успешно ли завершается future, выдает ошибку или отменяется.
Обратный вызов выполняется в отдельном потоке.
Это не дает мне того, что мне нужно, из-за требования к обратному вызову fn принимать future в качестве единственного аргумента.
Вот пример кода части того, что я пытаюсь сделать:
def method(cu_device_id):
print("Hello world, I'm going to use GPU %i" % cu_device_id)
def callback_fn(cu_device_id)
gpu_queue.put(cu_device_id)
cu_device_id = gpu_queue.get()
future = client.submit(method, cu_device_id)
#gpu_queue.put(cu_device_id) # Does not work, clients will shortly end up piled onto the slowest GPU
result.add_done_callback(callback_fn) # Crash / no way to pass in cu_device_id
Идея здесь в том, чтобы клиент взял доступный графический процессор из очереди, а затем, как только он закончит его использовать, вернул его в очередь, чтобы другой клиент мог его использовать.
Один из способов обойти это — передать gpu_queue в client:
def method(gpu_queue):
cu_device_id = gpu_queue.get()
print("Hello world, I'm going to use GPU %i" % cu_device_id)
gpu_queue.put(cu_device_id)
future = client.submit(method, gpu_queue)
Все работает так, как ожидалось, вот так. Но я предпочитаю иметь возможность делать это извне Чего мне не хватает или не вижу, чтобы заставить это работать?
Спасибо
Ответ №1:
Используйте functools.partial. Это помогает отправлять параметры в таких сценариях. Подробнее об этом здесь. Кроме того, теперь официальная документация также включает информацию о том, как отправлять параметры в add_done_callback
Ответ №2:
Это должно быть обработано на стороне сервера. Когда задача выполнена, экземпляр класса Future (будущее) передается в callback_fn. Вы можете передать аргумент, обернув это определение функции другой функцией, которая возвращает обратный вызов в правильном формате:
def method(cu_device_id):
print("Hello world, I'm going to use GPU %i" % cu_device_id)
def get_callback_fn(cu_device_id):
def callback_fn(future):
gpu_queue.put(cu_device_id)
return callback_fn
cu_device_id = gpu_queue.get()
future = client.submit(method, cu_device_id)
future.add_done_callback(get_callback_fn(cu_device_id))
Ответ №3:
Вы могли бы также рассмотреть возможность обработки этого на стороне клиента с as_completed
итератором
data = iter(data)
futures = []
using_gpu = {}
for i in range(n_gpus):
future = client.submit(process, next(data), use_gpu=i)
using_gpu[future] = i
seq = as_completed(futures)
for future in seq:
gpu = using_gpu.pop(future)
new = client.submit(process, next(data), use_gpu=gpu) # TODO: handle end of data sequence gracefully
using_gpu[new] = gpu
seq.add(new) # add this into the sequence