#python #ipython
#питон #ipython #python
Вопрос:
Я использую средство параллельной обработки IPython для большой операции с картой. В ожидании завершения операции map я хотел бы показать пользователю, сколько заданий завершено, сколько запущено и сколько осталось. Как я могу найти эту информацию?
Вот что я делаю. Я создаю профиль, который использует локальный движок, и запускаю двух рабочих. В оболочке:
$ ipython profile create --parallel --profile=local
$ ipcluster start --n=2 --profile=local
Вот клиентский скрипт Python:
#!/usr/bin/env python
def meat(i):
import numpy as np
import time
import sys
seconds = np.random.randint(2, 15)
time.sleep(seconds)
return seconds
import time
from IPython.parallel import Client
c = Client(profile='local')
dview = c[:]
ar = dview.map_async(meat, range(4))
elapsed = 0
while True:
print 'After %d s: %d running' % (elapsed, len(c.outstanding))
if ar.ready():
break
time.sleep(1)
elapsed = 1
print ar.get()
Пример вывода из скрипта:
After 0 s: 2 running
After 1 s: 2 running
After 2 s: 2 running
After 3 s: 2 running
After 4 s: 2 running
After 5 s: 2 running
After 6 s: 2 running
After 7 s: 2 running
After 8 s: 2 running
After 9 s: 2 running
After 10 s: 2 running
After 11 s: 2 running
After 12 s: 2 running
After 13 s: 2 running
After 14 s: 1 running
After 15 s: 1 running
After 16 s: 1 running
After 17 s: 1 running
After 18 s: 1 running
After 19 s: 1 running
After 20 s: 1 running
After 21 s: 1 running
After 22 s: 1 running
After 23 s: 1 running
[9, 14, 10, 3]
Как вы можете видеть, я могу получить количество выполняемых в данный момент заданий, но не количество завершенных (или оставшихся) заданий. Как я могу узнать, сколько map_async
заданий завершено?
Ответ №1:
у AsyncResult есть msg_ids
атрибут. Невыполненные задания — это пересечение с rc.outstanding , а выполненные задания — разница:
msgset = set(ar.msg_ids)
completed = msgset.difference(rc.outstanding)
pending = msgset.intersection(rc.outstanding)
Комментарии:
1. Это работает некорректно, когда я использую прямой интерфейс IPython (
c[:]
), как указано выше. Однако, если я использую интерфейс задачи (c.load_balanced_view()
), он работает прекрасно. Спасибо!2. При использовании прямого интерфейса идентификаторы msg_id не соответствуют отдельным элементам — последовательность разбита на фрагменты, поэтому для каждого движка существует только одна задача (т.Е. Один идентификатор msg_id). Аналогично, с интерфейсом задачи вы можете указать
chunksize=n
, и идентификаторы msg_id будут соответствовать количеству блоков , а не количеству отдельных элементов.