Как увидеть количество завершенных или оставшихся заданий map_async?

#python #ipython

#питон #ipython #python

Вопрос:

Я использую средство параллельной обработки IPython для большой операции с картой. В ожидании завершения операции map я хотел бы показать пользователю, сколько заданий завершено, сколько запущено и сколько осталось. Как я могу найти эту информацию?

Вот что я делаю. Я создаю профиль, который использует локальный движок, и запускаю двух рабочих. В оболочке:

 $ ipython profile create --parallel --profile=local
$ ipcluster start --n=2 --profile=local
  

Вот клиентский скрипт Python:

 #!/usr/bin/env python

def meat(i):
    import numpy as np
    import time
    import sys
    seconds = np.random.randint(2, 15)
    time.sleep(seconds)
    return seconds

import time
from IPython.parallel import Client

c = Client(profile='local')
dview = c[:]

ar = dview.map_async(meat, range(4))
elapsed = 0
while True:
    print 'After %d s: %d running' % (elapsed, len(c.outstanding))
    if ar.ready():
        break
    time.sleep(1)
    elapsed  = 1
print ar.get()
  

Пример вывода из скрипта:

 After 0 s: 2 running
After 1 s: 2 running
After 2 s: 2 running
After 3 s: 2 running
After 4 s: 2 running
After 5 s: 2 running
After 6 s: 2 running
After 7 s: 2 running
After 8 s: 2 running
After 9 s: 2 running
After 10 s: 2 running
After 11 s: 2 running
After 12 s: 2 running
After 13 s: 2 running
After 14 s: 1 running
After 15 s: 1 running
After 16 s: 1 running
After 17 s: 1 running
After 18 s: 1 running
After 19 s: 1 running
After 20 s: 1 running
After 21 s: 1 running
After 22 s: 1 running
After 23 s: 1 running
[9, 14, 10, 3]
  

Как вы можете видеть, я могу получить количество выполняемых в данный момент заданий, но не количество завершенных (или оставшихся) заданий. Как я могу узнать, сколько map_async заданий завершено?

Ответ №1:

у AsyncResult есть msg_ids атрибут. Невыполненные задания — это пересечение с rc.outstanding , а выполненные задания — разница:

 msgset = set(ar.msg_ids)
completed = msgset.difference(rc.outstanding)
pending = msgset.intersection(rc.outstanding)
  

Комментарии:

1. Это работает некорректно, когда я использую прямой интерфейс IPython ( c[:] ), как указано выше. Однако, если я использую интерфейс задачи ( c.load_balanced_view() ), он работает прекрасно. Спасибо!

2. При использовании прямого интерфейса идентификаторы msg_id не соответствуют отдельным элементам — последовательность разбита на фрагменты, поэтому для каждого движка существует только одна задача (т.Е. Один идентификатор msg_id). Аналогично, с интерфейсом задачи вы можете указать chunksize=n , и идентификаторы msg_id будут соответствовать количеству блоков , а не количеству отдельных элементов.