#nginx #uwsgi #python-3.8 #apscheduler #spooler
Вопрос:
Я пишу веб-сайт на a Flask
, я использую его для запуска uwsgi nginx
. Нужно было писать а timer
, чтобы периодически выполнять задания. Для этого я использовал uwsgidecorators
. Задача должна проверить состояние задач планировщика. Чтобы получить список задач, я использовал get_jobs()
. Но список, который я продолжаю составлять, пустеет.
webapp/__init__.py:
# -*- coding: utf-8 -*-
from gevent import monkey
monkey.patch_all()
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
from flask import Flask, session, request
from config import DevelopConfig, MqttConfig, MailConfig, ProductionConfig
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
from flask_mqtt import Mqtt
from flask_login import LoginManager
from flask_babel import Babel
from flask_babel_js import BabelJS
from flask_babel import lazy_gettext as _l
from apscheduler.schedulers.gevent import GeventScheduler
app = Flask(__name__)
app.config.from_object(ProductionConfig)
app.config.from_object(MqttConfig)
app.config.from_object(MailConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db, render_as_batch=True)
mail = Mail(app)
mqtt = Mqtt(app)
manager = Manager(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'auth'
login_manager.login_message = _l("Необходимо авторизоваться для доступа к закрытой странице")
login_manager.login_message_category = "error"
scheduler = GeventScheduler()
scheduler.start()
scheduler.add_job(publish_async, args=["Hello"], id="job", trigger='interval', seconds=2)
socketio = SocketIO(app, async_mode='gevent_uwsgi') # Production Version
babel = Babel(app)
babeljs = BabelJS(app=app, view_path='/translations/')
import webapp.views
@babel.localeselector
def get_locale():
# if the user has set up the language manually it will be stored in the session,
# so we use the locale from the user settings
try:
language = session['language']
except KeyError:
language = None
if language is not None:
print(language)
return language
return request.accept_languages.best_match(app.config['LANGUAGES'].keys())
from webapp import models
def publish_async(message):
print(message)
webapp/tasks.py:
from uwsgidecorators import timer
@timer(10, target='spooler')
def check_run_tasks(args):
_list_schedulers = _scheduler_method.get_jobs()
print(_list_schedulers)
wsgi.ini
:
[uwsgi]
env = PYTHONIOENCODING=UTF-8
module = wsgi:app
master = true
# processes = 5
enable-threads = true
gevent = 1024
gevent-monkey-patch = true
buffer-size=32768
# lazy-apps = true
socket = /home/sammy/projectnew/projectnew.sock
socket-timeout = 30
chmod-socket = 664
thunder-lock = true
spooler = /home/sammy/projectnew/webapp/mytasks
import = webapp/tasks.py
vacuum = true
die-on-term = true
wsgi.py
:
# -*- coding: utf-8 -*-
from webapp import app, socketio
if __name__ == '__main__':
socketio.run(app, use_reloader=False)