#python #email #flask #celery
#python #Адрес электронной почты #flask #сельдерей
Вопрос:
Я пытаюсь отправить почту в задаче celery с помощью flask-mail, однако я продолжаю получать эту ошибку во время выполнения RuntimeError('working outside of application context',)
. Это код, который у меня есть в самом файле:
from app import app
from celery import Celery
from flask.ext.mail import Message
from flask import current_app
# config
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# set up celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
# put the processing task here
@celery.task
def send_results(filename, email_addr):
msg = Message('testing email', recipients=[email_addr])
msg.body = 'testing this funct'
with app.app_context():
current_app.mail.send(msg)
print(filename)
print(email_addr)
Обратите внимание, что там есть app.app_context()
строка (на данный момент я не уверен, правильно это или нет).
Еще одна вещь, на которую следует обратить внимание, это то, что приложение еще не полностью «завершено». В частности, я имею в виду, что создание приложения обрабатывается функцией с именем create_app, как показано здесь:https://github.com/swarajd/seq2flux/blob/master/app/startup/create_app.py
Эта функция вызывается в manage.py здесь:https://github.com/swarajd/seq2flux/blob/master/manage.py
Я пытаюсь вызвать эту задачу в файле с именем views.py
, который обрабатывает все просмотры. Одна функция, в частности, которая имеет дело с этим, заключается в следующем:
@core_blueprint.route('data_analysis', methods=['GET', 'POST'])
@login_required
def data_analysis():
# print(request.form)
form = DataAnalysisForm(CombinedMultiDict((request.files, request.form)))
# print(form)
if (request.method == 'POST' and form.validate()):
# print(request.form)
# print(request.files)
file = request.files['seqFile']
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
mail_engine = current_app.extensions.get('mail', None)
# print(mail_engine)
# print(current_user)
send_results.delay(filename, current_user.email)
flash('processing job scheduled!')
# Redirect to home page
return redirect(url_for('core.data_analysis'))
return render_template('core/data_analysis.html', form=form)
Важная строка: send_results.delay()
Мой вопрос: даже с контекстом, почему он выдает эту ошибку во время выполнения?
Ответ №1:
Причина, по которой это происходило, заключалась в том, что само СООБЩЕНИЕ электронной почты создавалось вне with
инструкции, вызывая ошибку времени выполнения.