Доступ к ключу Кафки при входе в функцию с отслеживанием состояния Python Flink

#python #apache-flink #remote-execution #flink-statefun

Вопрос:

Я изучал функции с отслеживанием состояния Flink. Это выглядит очень многообещающе — за исключением одной вещи, — и я надеюсь, что мне ее просто не хватает.

Ни за что на свете я не вижу способа получить доступ к ключу кафки из входа кафки в Python. В Java я вижу, что мог бы использовать десериализатор и эффективно упаковать его в декодированный message объект. Но я не могу найти альтернативу.
В нашем случае ключ содержит ценную информацию, которой нет в значении.

Кто — нибудь сталкивался с этим- или я просто пропустил это?

Ответ №1:

Первое, что я хотел бы упомянуть, это то, что встроенный вход Кафки для удаленных функций требует, чтобы ключ можно было интерпретировать как строку UTF-8. Если это действительно ваш случай, то вы можете просто получить его через:

 def example(context, message):  key = context.address.id   ...   print(key) # utf8 string  

Если это не так, то, к сожалению, на данном этапе вам придется использовать встроенный SDK, чтобы извлечь ключ и значение перед отправкой сообщения удаленной функции.

Комментарии:

1. Идеально — спасибо. Йип — я просто пропустил это