Python RQ: шаблон для обратного вызова

#python #parallel-processing #python-rq

#python #параллельная обработка #python-rq

Вопрос:

Теперь у меня есть большое количество документов для обработки, и я использую Python RQ для распараллеливания задачи.

Я хотел бы, чтобы конвейер работы выполнялся по мере выполнения разных операций над каждым документом. Например: A -> B -> C означает передать документ функции A , после A завершения перейдите к B и последнему C .

Однако Python RQ, похоже, не очень хорошо поддерживает конвейерные материалы.

Вот простой, но несколько грязный способ сделать это. Одним словом, каждая функция в конвейере вызывает свою следующую функцию вложенным способом.

Например, для конвейера A -> B -> C .

На верхнем уровне некоторый код написан следующим образом:

q.enqueue(A, the_doc)

где q — Queue экземпляр, а в функции A есть код, подобный:

q.enqueue(B, the_doc)

И в B , есть что — то вроде этого:

q.enqueue(C, the_doc)

Есть ли какой-нибудь другой способ, более элегантный, чем этот? Например, некоторый код в ОДНОЙ функции:

q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)

параметр depends_on является наиболее близким к моему требованию, однако, выполняя что-то вроде:


A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job )

не будет работать. As q.enqueue(B, depends_on=A_job ) выполняется сразу после A_job = q.enqueue(A, the_doc) выполнения. К моменту постановки B в очередь результат от A может быть не готов, поскольку для его обработки требуется время.

PS:

Если Python RQ не очень хорош в этом, какой еще инструмент в Python я могу использовать для достижения той же цели:

  1. циклическое распараллеливание
  2. поддержка конвейерной обработки

Комментарии:

1. как насчет использования 3 очередей? один для задания a, затем другой для задания b и последний для задания c, единственное, что когда задание a заканчивается, документ ставится в очередь на очередь задания b и так далее…

2. Даже если q.enqueue(B, depends_on=A_job ) задание B будет обработано после завершения A. Разве не важно, когда он обрабатывает , а не ставит в очередь ?

3. @Marcel Чем использование 3 очередей отличается от использования одной очереди? Как бы это решить?

4. вы нашли ответ на это?

Ответ №1:

К моменту постановки B в очередь результат от A может быть не готов, поскольку для его обработки требуется время.

Я не уверен, было ли это на самом деле правдой, когда вы изначально опубликовали вопрос, но в любом случае сейчас это не так. Фактически, эта depends_on функция создана именно для описанного вами рабочего процесса.

Верно, что эти две функции выполняются немедленно последовательно.

 A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )
 

Но рабочий не будет выполняться B до A тех пор, пока не будет завершен. Пока A_job не будет успешно выполнено, B.status == 'deferred' . Один раз A.status == 'finished' , затем B начнется запуск.

Это означает, что B and C может получать доступ к результатам своих зависимостей и работать с ними следующим образом:

 import time
from rq import Queue, get_current_job
from redis import StrictRedis

conn = StrictRedis()
q = Queue('high', connection=conn)

def A():
    time.sleep(100)
    return 'result A'

def B():
    time.sleep(100)
    current_job = get_current_job(conn)
    a_job_id = current_job.dependencies[0].id
    a_job_result = q.fetch_job(a_job_id).result
    assert a_job_result == 'result A'
    return a_job_result   ' result B'


def C():
    time.sleep(100)
    current_job = get_current_job(conn)
    b_job_id = current_job.dependencies[0].id
    b_job_result = q.fetch_job(b_job_id).result
    assert b_job_result == 'result A result B'
    return b_job_result   ' result C'
 

Рабочий в конечном итоге напечатает 'result A result B result C' .

Кроме того, если у вас много заданий в очереди и B они могут ждать некоторое время перед выполнением, вы можете значительно увеличить result_ttl или сделать его неопределенным с result_ttl=-1 помощью . В противном случае результат A будет удален по истечении заданного количества секунд result_ttl , и в этом случае B он больше не сможет получить к нему доступ и вернуть желаемый результат.

Однако настройка result_ttl=-1 имеет важные последствия для памяти. Это означает, что ваш результат ваших заданий никогда не будет автоматически удален, и объем памяти будет пропорционально расти, пока вы вручную не удалите эти результаты из redis.

Комментарии:

1. Какова цель добавления режима ожидания в каждое задание?

2. @user248884 для имитации длительного процесса.

Ответ №2:

параметр depends_on является наиболее близким к моему требованию, однако, выполняя что-то вроде:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

не будет работать. Поскольку q.enqueue(B, depends_on=A_job ) выполняется сразу после выполнения A_job = q.enqueue(A, the_doc) . К моменту постановки B в очередь результат от A может быть не готов, поскольку для его обработки требуется время.

В этом случае q.enqueue(B, depends_on=A_job) будет выполняться после завершения A_job. Если результат не готов, q.enqueue(B, depends_on=A_job) будет ждать, пока он не будет готов.


Он не поддерживает его из коробки, но с использованием других технологий это возможно.

В моем случае я использовал кэширование для отслеживания предыдущего задания в цепочке, поэтому, когда мы хотим поставить в очередь новую функцию (для запуска сразу после), мы можем правильно установить ее параметр ‘depends_on’ при вызове enqueue ()

Обратите внимание на использование дополнительных параметров для постановки в очередь: ‘timeout, result_ttl, ttl’. Они использовались, поскольку я выполнял длинные задания на rq. Вы можете ссылаться на их использование в документах python rq.

Я использовал django_rq.enqueue(), который является производным от python rq

     # main.py
    def process_job():
        ...

        # Create a cache key for every chain of methods you want to call.
        # NOTE: I used this for web development, in your case you may want
        # to use a variable or a database, not caching

        # Number of time to cache and keep the results in rq
        TWO_HRS = 60 * 60 * 2

        cache_key = 'update-data-key-%s' % obj.id
        previous_job_id = cache.get(cache_key)
        job = django_rq.enqueue(update_metadata,
                                campaign=campaign,
                                list=chosen_list,
                                depends_on=previous_job_id,
                                timeout=TWO_HRS,
                                result_ttl=TWO_HRS,
                                ttl=TWO_HRS)

        # Set the value for the most recent finished job, so the next function
        # in the chain can set the proper value for 'depends_on'
        cache.set(token_key, job.id, TWO_HRS)

    # utils.py
    def update_metadata(campaign, list):
        # Your code goes here to update the campaign object with the list object
        pass
 

‘depends_on’ — из документов rq:

depends_on — указывает другое задание (или идентификатор задания), которое должно быть выполнено до того, как это задание будет поставлено в очередь