#python #parallel-processing #python-rq
#python #параллельная обработка #python-rq
Вопрос:
Теперь у меня есть большое количество документов для обработки, и я использую Python RQ для распараллеливания задачи.
Я хотел бы, чтобы конвейер работы выполнялся по мере выполнения разных операций над каждым документом. Например: A
-> B
-> C
означает передать документ функции A
, после A
завершения перейдите к B
и последнему C
.
Однако Python RQ, похоже, не очень хорошо поддерживает конвейерные материалы.
Вот простой, но несколько грязный способ сделать это. Одним словом, каждая функция в конвейере вызывает свою следующую функцию вложенным способом.
Например, для конвейера A
-> B
-> C
.
На верхнем уровне некоторый код написан следующим образом:
q.enqueue(A, the_doc)
где q — Queue
экземпляр, а в функции A
есть код, подобный:
q.enqueue(B, the_doc)
И в B
, есть что — то вроде этого:
q.enqueue(C, the_doc)
Есть ли какой-нибудь другой способ, более элегантный, чем этот? Например, некоторый код в ОДНОЙ функции:
q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)
параметр depends_on является наиболее близким к моему требованию, однако, выполняя что-то вроде:
A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job )
не будет работать. As q.enqueue(B, depends_on=A_job )
выполняется сразу после A_job = q.enqueue(A, the_doc)
выполнения. К моменту постановки B в очередь результат от A может быть не готов, поскольку для его обработки требуется время.
PS:
Если Python RQ не очень хорош в этом, какой еще инструмент в Python я могу использовать для достижения той же цели:
- циклическое распараллеливание
- поддержка конвейерной обработки
Комментарии:
1. как насчет использования 3 очередей? один для задания a, затем другой для задания b и последний для задания c, единственное, что когда задание a заканчивается, документ ставится в очередь на очередь задания b и так далее…
2. Даже если
q.enqueue(B, depends_on=A_job )
задание B будет обработано после завершения A. Разве не важно, когда он обрабатывает , а не ставит в очередь ?3. @Marcel Чем использование 3 очередей отличается от использования одной очереди? Как бы это решить?
4. вы нашли ответ на это?
Ответ №1:
К моменту постановки B в очередь результат от A может быть не готов, поскольку для его обработки требуется время.
Я не уверен, было ли это на самом деле правдой, когда вы изначально опубликовали вопрос, но в любом случае сейчас это не так. Фактически, эта depends_on
функция создана именно для описанного вами рабочего процесса.
Верно, что эти две функции выполняются немедленно последовательно.
A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )
Но рабочий не будет выполняться B
до A
тех пор, пока не будет завершен. Пока A_job
не будет успешно выполнено, B.status == 'deferred'
. Один раз A.status == 'finished'
, затем B
начнется запуск.
Это означает, что B
and C
может получать доступ к результатам своих зависимостей и работать с ними следующим образом:
import time
from rq import Queue, get_current_job
from redis import StrictRedis
conn = StrictRedis()
q = Queue('high', connection=conn)
def A():
time.sleep(100)
return 'result A'
def B():
time.sleep(100)
current_job = get_current_job(conn)
a_job_id = current_job.dependencies[0].id
a_job_result = q.fetch_job(a_job_id).result
assert a_job_result == 'result A'
return a_job_result ' result B'
def C():
time.sleep(100)
current_job = get_current_job(conn)
b_job_id = current_job.dependencies[0].id
b_job_result = q.fetch_job(b_job_id).result
assert b_job_result == 'result A result B'
return b_job_result ' result C'
Рабочий в конечном итоге напечатает 'result A result B result C'
.
Кроме того, если у вас много заданий в очереди и B
они могут ждать некоторое время перед выполнением, вы можете значительно увеличить result_ttl
или сделать его неопределенным с result_ttl=-1
помощью . В противном случае результат A будет удален по истечении заданного количества секунд result_ttl
, и в этом случае B
он больше не сможет получить к нему доступ и вернуть желаемый результат.
Однако настройка result_ttl=-1
имеет важные последствия для памяти. Это означает, что ваш результат ваших заданий никогда не будет автоматически удален, и объем памяти будет пропорционально расти, пока вы вручную не удалите эти результаты из redis.
Комментарии:
1. Какова цель добавления режима ожидания в каждое задание?
2. @user248884 для имитации длительного процесса.
Ответ №2:
параметр depends_on является наиболее близким к моему требованию, однако, выполняя что-то вроде:
A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )
не будет работать. Поскольку q.enqueue(B, depends_on=A_job ) выполняется сразу после выполнения A_job = q.enqueue(A, the_doc) . К моменту постановки B в очередь результат от A может быть не готов, поскольку для его обработки требуется время.
В этом случае q.enqueue(B, depends_on=A_job) будет выполняться после завершения A_job. Если результат не готов, q.enqueue(B, depends_on=A_job) будет ждать, пока он не будет готов.
Он не поддерживает его из коробки, но с использованием других технологий это возможно.
В моем случае я использовал кэширование для отслеживания предыдущего задания в цепочке, поэтому, когда мы хотим поставить в очередь новую функцию (для запуска сразу после), мы можем правильно установить ее параметр ‘depends_on’ при вызове enqueue ()
Обратите внимание на использование дополнительных параметров для постановки в очередь: ‘timeout, result_ttl, ttl’. Они использовались, поскольку я выполнял длинные задания на rq. Вы можете ссылаться на их использование в документах python rq.
Я использовал django_rq.enqueue(), который является производным от python rq
# main.py
def process_job():
...
# Create a cache key for every chain of methods you want to call.
# NOTE: I used this for web development, in your case you may want
# to use a variable or a database, not caching
# Number of time to cache and keep the results in rq
TWO_HRS = 60 * 60 * 2
cache_key = 'update-data-key-%s' % obj.id
previous_job_id = cache.get(cache_key)
job = django_rq.enqueue(update_metadata,
campaign=campaign,
list=chosen_list,
depends_on=previous_job_id,
timeout=TWO_HRS,
result_ttl=TWO_HRS,
ttl=TWO_HRS)
# Set the value for the most recent finished job, so the next function
# in the chain can set the proper value for 'depends_on'
cache.set(token_key, job.id, TWO_HRS)
# utils.py
def update_metadata(campaign, list):
# Your code goes here to update the campaign object with the list object
pass
‘depends_on’ — из документов rq:
depends_on — указывает другое задание (или идентификатор задания), которое должно быть выполнено до того, как это задание будет поставлено в очередь