Задачи сельдерея и задача в группе получены, но не выполняются

#python #flask #redis #celery #worker

#python #flask #redis #сельдерей #рабочий

Вопрос:

Я работаю над приложением, предназначенным для создания презентации на основе скриншотов, созданных с помощью selenium webdriver. Технологический стек: Python 3.8.6 , Flask 1.1.2 , Celery 4.4.7 , Redis server 4.0.9 Ubuntu 18.04.3 LTS . Когда я добавляю только одну задачу, рабочий сельдерея работает нормально, но когда я пытаюсь вызвать 2 или 3 задачи одну за другой, задачи принимаются, но никогда не выполняются. Время добавления задачи также влияет на поведение работника. Когда я добавляю одну задачу — ожидание около ~ 2 секунд, а затем добавляю вторую — все задачи выполняются нормально. Но когда я пытаюсь добавить 3 задачи — всегда возникают проблемы. Иногда первая задача выполняется, а другие нет, но иногда ни одна из них не выполняется. Это пример моего кода:

Я запускаю задачу с помощью $.ajax — post:

 $.ajax({
    type: 'POST',
    url: '/presentationTask/1234/4321',
    data: {
           filters_array: JSON.stringify([
               { filterTable : "Location", filterColumn: "City", filterValue: "Chicago, IL" }
    ])
    },
    success: function(data, status, request) {
        status_url = request.getResponseHeader('Location');
        update_progress(status_url, nanobar, div[0]);
    },
    error: function() {
       alert('Unexpected error');
    }
});
  

Маршрутизация:

 @bp.route('/presentationTask/<workspaceId>/<reportId>', methods=['POST'])
def presentationTask(workspaceId,reportId):
    filters = request.form['filters_array']
    task = createPresentation.apply_async(args=[workspaceId,reportId,filters])  
    
return jsonify({}), 202, {'Location': url_for('tasks.taskstatus',
                                                  task_id= task.id)}
  

И в методе «createPresntation» я должен много раз вызывать задачу «createScreen», поэтому я использовал «подпись» и «группа» для запуска задач в группе. Я жду, когда все задачи будут завершены, а затем присоединюсь к их результатам с помощью «join () / join_native ()»

 @celery.task(bind=True)
def createPresentation(self, workspaceId, reportId, filterValues):
    self.update_state(state='PENDING')
    .
    .
    .
    
    for filter in json_filters:
        for page in json_pages["value"]:
            jobList.append(createScreen.signature(args=[workspaceId, reportId, page["Name"], 
                    filter['filterTable'], filter['filterColumn'], filter['filterValue'], currentIndex, 
                    page["displayName"]]))
   
    pageReportJob = group(jobList)
    results = pageReportJob.apply_async()

    while not results.ready():
        current = results.completed_count()
        
        self.update_state(
            state='PROGRESS',
            meta={'current': current, 'total': total,'status': message})  
        time.sleep(2)

    with allow_join_result():
        results.join_native()
    ....
    
  
  

Я запускаю рабочий элемент сельдерея по команде:

 celery worker -A celery_worker.celery --loglevel=info --without-gossip 
--without-mingle --without-heartbeat -Ofair 
  

Ответ №1:

Я нашел решение — я установил Flower для мониторинга задач сельдерея и заметил, что сельдерей выделяет память для 2 основных задач (createPresentation) и ожидает выполнения задач (createScreen), но они никогда не будут выполнены, потому что все процессоры / потоки заняты задачами (createPresentation). Итак, я создал 2 очереди, одну с высоким приоритетом для createScreen и одну с приоритетом по умолчанию для createPresentation. Затем я создаю маршруты сельдерея и указываю, какие маршруты для каждого из них.

 CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default'),
    Queue('priority_high'),
)

CELERY_ROUTES = {
    'app.screenshots.services.createScreen': {'queue': 'priority_high'},
    'app.presentation.services.createPresentation': {'queue': 'default'},
}