#python #python-3.x #flask #socket.io #flask-socketio
Вопрос:
Я застрял с этой проблемой на 2 дня. Я перепробовал множество вещей, но они, похоже, не работали на меня. Я не могу выдать событие.
Main.py —
from flask import Flask, g
from flask_socketio import SocketIO, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('connect')
def handle_connection():
from notification_handler import Notification_Handler
if __name__ == '__main__':
server_ip = 'SERVER_IP'
with app.app_context() :
socketio.run(app, host=server_ip, port=5000)
Notification_Handler.py
from flask import request, g
from flask_socketio import SocketIO, emit
try:
from __main__ import socketio, app
except ImportError:
from application import socketio, app
class Notification_Handler(object):
@staticmethod
def store_notification(notification):
lastnotification = Notification_Handler.fetch_notification()
resp = {}
if (lastnotification == None) :
resp["status"] = False
resp["message"] = "No notifications available"
with app.app_context():
emit('notification_response',resp,namespace='/',broadcast=True)
Во время работы я получаю эту ошибку —
Working outside of application context.
This typically means that you attempted to use functionality that needed to
interface with the current application object in some way. To solve
this, set up an application context with app.app_context(). See the
documentation for more information.
Traceback (most recent call last):
File "/home/project/release/module/someother.py", line 225, in abc_communication
Notification_Handler.store_notification(notification)
File "/home/project/release/module/notification_handler.py", line 73, in store_notification
emit('notification_response',resp,namespace='/',broadcast=True)
File "/home/project/venv/lib/python3.8/site-packages/flask_socketio/__init__.py", line 782, in emit
socketio = flask.current_app.extensions['socketio']
File "/home/project/venv/lib/python3.8/site-packages/werkzeug/local.py", line 422, in __get__
obj = instance._get_current_object()
File "/home/project/venv/lib/python3.8/site-packages/werkzeug/local.py", line 544, in _get_current_object
return self.__local() # type: ignore
File "/home/project/venv/lib/python3.8/site-packages/flask/globals.py", line 52, in _find_app
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
Я пробовал такие вещи, как использование
socketio.emit('notification_response',resp,namespace='/',broadcast=True)
as well.
Note — I have a single user, so need to maintain the socket Ids. Also, I am able to connect to flask socket io from Angular socket io. The other files which have emit events are working fine, only this file is not working for me.
I am new to python and with socketio.
Edit 1-
The other file which is working —
from . notification_handler import Notification_Handler
import json
from flask_socketio import SocketIO, emit
try:
from __main__ import socketio
except ImportError:
from application import socketio
Class Notification_Impl(Resource):
@socketio.on('notification_sending_event')
def get(self):
notification = Notification_Handler.fetch_notification()
resp = {}
if (notification == None) :
resp["status"] = False
resp["message"] = "No notifications available"
else :
resp["status"] = True
resp["message"] = json.loads(notification)
emit('notification_response',resp, namespace='/', broadcast=True)
В этом файле нет @staticmethod.
Служба пользовательского интерфейса (Угловая) —
import { Injectable } from '@angular/core';
import * as io from 'socket.io-client';
import { Observable, BehaviorSubject } from "rxjs";
import {BASE_URL} from 'src/app/configs/app.config'
@Injectable()
export class SocketService {
socket:any;
notificationResponse = new BehaviorSubject({});
constructor() {
this.socket = io(BASE_URL '/');
}
listen(event: string){
return new Observable((subscriber)=>{
this.socket.on(event, (data)=>{
subscriber.next(data);
})
})
}
emit(event:string, data:any){
this.socket.emit(event, data);
}
}
Файл компонента —
this.socketService.emit('notification_sending_event',{'data':'Connected'});
this.socketService.listen('notification_response').subscribe((res)=>{
console.log(res);
});
В notification_handler.py, когда происходит выброс, возникает эта ошибка, или иногда пользовательский интерфейс не может ее прослушать, но когда выброс происходит из другого файла, пользовательский интерфейс прослушивает.
Комментарии:
1. @miguel, пожалуйста, помогите
Ответ №1:
Перед вызовом функции необходимо создать контекст приложения emit()
:
with app.app_context():
emit('notification_response', resp, namespace='/', broadcast=True)
Комментарии:
1. Спасибо, что ответили. Когда я делаю то же самое с другими файлами и использую внутри них app.app_context, это работает. Но не в этом файле, т. е. Notification_Handler.py. Нужно ли мне также указывать пространство имен в коде пользовательского интерфейса, в котором я прослушиваю это событие?
2. Вы не можете просто сказать «это не работает», так как это не дает мне никакой дополнительной информации, которая помогла бы вам решить проблему. Обновите свой вопрос новой ошибкой, которую вы получаете, когда контекст приложения настроен правильно, как я указал.
3. Я обновил вопрос с учетом того, что я написал.
4. Мне жаль, но я не совсем понимаю, правильно ли вы это делаете. Обновленный код с приложением.app_context поврежден,
app
импорт не выполняется, поэтому я подозреваю, что вы просто отредактировали это вручную в вопросе. Ошибка не может быть той же самой после того, как вы нажмете контекст, по крайней мере, номер строки ошибки должен был измениться, поэтому вы на самом деле не делаете то, о чем я прошу.5. Извините, приложение импортировано, и контекст приложения уже перемещается, как вы предлагали ранее. Теперь ошибки нет, но эмиссии не происходит, так как пользовательский интерфейс не может ее прослушивать.