Функция отслеживания состояния Apache Flink — проблема с сериализацией?

#python-3.x #protocol-buffers #apache-flink #flink-statefun #protobuf-python

#python-3.x #протокол-буферы #apache-flink #flink-statefun #protobuf-python

Вопрос:

Я пытаюсь создать проект, используя функцию отслеживания состояния Apache Flink на Python, но, похоже, я не могу заставить ее работать. Я сузил проблему до того, что, похоже, когда я отправляю запрос к моей функции отслеживания состояния через мою схему protobuf, сериализатор не может сериализовать мое сообщение в класс, который я ожидаю. Вот что я пытаюсь сделать:

 import json
from statefun import StatefulFunctions, RequestReplyHandler
from jobs.session_event_pb2 import Event

functions = StatefulFunctions()


@functions.bind("namespace/funcname")
def funcname(context, session: Event):
    print("hello world")


handler = RequestReplyHandler(functions)

if __name__ == '__main__':
    inputFile = open("my_file.json", "r")
    for line in inputFile:
        data = json.loads(line).get('properties')
        if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
            request = Event()
            request.prop1 = data["prop1"]
            request.prop2 = data["prop2"]
            request = request.SerializeToString()
            handler(request)
  

Вот моя схема Protobuf:

 syntax = "proto3";

package mypackage;

message Event {
    string prop1 = 1;
    string prop2 = 2;
}
  

Что я здесь делаю не так?

Ответ №1:

Это потому, что обработчик RequestReply не принимает прямые сообщения protobuf. Среда выполнения Flink отправляет тип с именем ToFunction и получает ответ типа FromFunction . Эта полезная нагрузка содержит ваши сообщения вызывающего абонента вместе с сохраненными значениями и другой метаинформацией.

Если вы не можете вызвать функции напрямую, например, в тесте, я бы посоветовал вам сделать это и вообще не использовать обработчик.

Комментарии:

1. Спасибо! Так почему же тогда в их примере начального кода ( ci.apache.org/projects/flink/flink-statefun-docs-master/… ) могут ли они отправить тело HTTP-запроса из Flask непосредственно в функцию? Чем отличается тело HTTP-запроса?

2. Вы можете вызвать ее вручную, если используете правильный тип «ToFunction» и «FromFunction», которые поступают через flask github.com/apache/flink-statefun/blob /…

3. Значит, фреймворк Flask использует эти классы ToFunction и FromFunction по умолчанию?

4. RequestReplyHandler выполняет