#python #flask #redis #celery #worker
#python #flask #redis #сельдерей #рабочий
Вопрос:
Я работаю над приложением, предназначенным для создания презентации на основе скриншотов, созданных с помощью selenium webdriver. Технологический стек: Python 3.8.6
, Flask 1.1.2
, Celery 4.4.7
, Redis server 4.0.9
Ubuntu 18.04.3 LTS
. Когда я добавляю только одну задачу, рабочий сельдерея работает нормально, но когда я пытаюсь вызвать 2 или 3 задачи одну за другой, задачи принимаются, но никогда не выполняются. Время добавления задачи также влияет на поведение работника. Когда я добавляю одну задачу — ожидание около ~ 2 секунд, а затем добавляю вторую — все задачи выполняются нормально. Но когда я пытаюсь добавить 3 задачи — всегда возникают проблемы. Иногда первая задача выполняется, а другие нет, но иногда ни одна из них не выполняется. Это пример моего кода:
Я запускаю задачу с помощью $.ajax — post:
$.ajax({
type: 'POST',
url: '/presentationTask/1234/4321',
data: {
filters_array: JSON.stringify([
{ filterTable : "Location", filterColumn: "City", filterValue: "Chicago, IL" }
])
},
success: function(data, status, request) {
status_url = request.getResponseHeader('Location');
update_progress(status_url, nanobar, div[0]);
},
error: function() {
alert('Unexpected error');
}
});
Маршрутизация:
@bp.route('/presentationTask/<workspaceId>/<reportId>', methods=['POST'])
def presentationTask(workspaceId,reportId):
filters = request.form['filters_array']
task = createPresentation.apply_async(args=[workspaceId,reportId,filters])
return jsonify({}), 202, {'Location': url_for('tasks.taskstatus',
task_id= task.id)}
И в методе «createPresntation» я должен много раз вызывать задачу «createScreen», поэтому я использовал «подпись» и «группа» для запуска задач в группе. Я жду, когда все задачи будут завершены, а затем присоединюсь к их результатам с помощью «join () / join_native ()»
@celery.task(bind=True)
def createPresentation(self, workspaceId, reportId, filterValues):
self.update_state(state='PENDING')
.
.
.
for filter in json_filters:
for page in json_pages["value"]:
jobList.append(createScreen.signature(args=[workspaceId, reportId, page["Name"],
filter['filterTable'], filter['filterColumn'], filter['filterValue'], currentIndex,
page["displayName"]]))
pageReportJob = group(jobList)
results = pageReportJob.apply_async()
while not results.ready():
current = results.completed_count()
self.update_state(
state='PROGRESS',
meta={'current': current, 'total': total,'status': message})
time.sleep(2)
with allow_join_result():
results.join_native()
....
Я запускаю рабочий элемент сельдерея по команде:
celery worker -A celery_worker.celery --loglevel=info --without-gossip
--without-mingle --without-heartbeat -Ofair
Ответ №1:
Я нашел решение — я установил Flower для мониторинга задач сельдерея и заметил, что сельдерей выделяет память для 2 основных задач (createPresentation) и ожидает выполнения задач (createScreen), но они никогда не будут выполнены, потому что все процессоры / потоки заняты задачами (createPresentation). Итак, я создал 2 очереди, одну с высоким приоритетом для createScreen и одну с приоритетом по умолчанию для createPresentation. Затем я создаю маршруты сельдерея и указываю, какие маршруты для каждого из них.
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default'),
Queue('priority_high'),
)
CELERY_ROUTES = {
'app.screenshots.services.createScreen': {'queue': 'priority_high'},
'app.presentation.services.createPresentation': {'queue': 'default'},
}