#python-3.x #protocol-buffers #apache-flink #flink-statefun #protobuf-python
#python-3.x #протокол-буферы #apache-flink #flink-statefun #protobuf-python
Вопрос:
Я пытаюсь создать проект, используя функцию отслеживания состояния Apache Flink на Python, но, похоже, я не могу заставить ее работать. Я сузил проблему до того, что, похоже, когда я отправляю запрос к моей функции отслеживания состояния через мою схему protobuf, сериализатор не может сериализовать мое сообщение в класс, который я ожидаю. Вот что я пытаюсь сделать:
import json
from statefun import StatefulFunctions, RequestReplyHandler
from jobs.session_event_pb2 import Event
functions = StatefulFunctions()
@functions.bind("namespace/funcname")
def funcname(context, session: Event):
print("hello world")
handler = RequestReplyHandler(functions)
if __name__ == '__main__':
inputFile = open("my_file.json", "r")
for line in inputFile:
data = json.loads(line).get('properties')
if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
request = Event()
request.prop1 = data["prop1"]
request.prop2 = data["prop2"]
request = request.SerializeToString()
handler(request)
Вот моя схема Protobuf:
syntax = "proto3";
package mypackage;
message Event {
string prop1 = 1;
string prop2 = 2;
}
Что я здесь делаю не так?
Ответ №1:
Это потому, что обработчик RequestReply не принимает прямые сообщения protobuf. Среда выполнения Flink отправляет тип с именем ToFunction
и получает ответ типа FromFunction
. Эта полезная нагрузка содержит ваши сообщения вызывающего абонента вместе с сохраненными значениями и другой метаинформацией.
Если вы не можете вызвать функции напрямую, например, в тесте, я бы посоветовал вам сделать это и вообще не использовать обработчик.
Комментарии:
1. Спасибо! Так почему же тогда в их примере начального кода ( ci.apache.org/projects/flink/flink-statefun-docs-master/… ) могут ли они отправить тело HTTP-запроса из Flask непосредственно в функцию? Чем отличается тело HTTP-запроса?
2. Вы можете вызвать ее вручную, если используете правильный тип «ToFunction» и «FromFunction», которые поступают через flask github.com/apache/flink-statefun/blob /…
3. Значит, фреймворк Flask использует эти классы ToFunction и FromFunction по умолчанию?
4. RequestReplyHandler выполняет