Обработка потоков данных в kafka

#apache-kafka

#apache-kafka

Вопрос:

Я пытаюсь настроить потребителя kafka для обработки данных из потоков Kafka. Я смог настроить подключение к потоку, и данные видны, но это смесь специальных символов и ASCII.

Я использую встроенную консоль kafka, но также попробовал версию confluent-kafka на python. Единственные параметры, которые необходимо соблюдать, — это использовать протокол безопасности SASL_PLAINTEXT с SCRAM-SHA-256. Я открыт для использования других методов для анализа выходных данных (по возможности, не Java).

Консоль Kafka

 bin/kafka-console-consumer.sh --bootstrap-server server:9092 
--topic TOPIC --from-beginning --consumer.config=consumer.properties
  

Confluent Kafka Python

 topics = "TOPIC"
conf = {
        "bootstrap.servers": "server:9092",
        "group.id": "group",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms" : "SCRAM-SHA-256",
}
c = Consumer(conf)
c.subscribe([topics])
running = True
while running:
        message = c.poll()
        print(message.value())
c.close()
  

Вывод

 PLE9K1PKH3S0MAY38ChangeRequest : llZYMEgVmq2CHG:Infra RequestKSUSMAINCHANGEKC-10200-FL01DATA_MISSINGCHGUSD
DATA_MISSINGDATA_MISSINGUSD
CANCEL

▒▒12SLM:Measurement"Schedule(1 = 0)USDUSD▒▒▒
                                                              l▒l▒V?▒▒▒
                                                                       llZYMEgVmq
company_team team_nameTEAM###SGP000000140381PPL000002020234
Latha M▒>▒>▒ChangeRequest
hello:1234543534 cloud abcdef▒▒▒
                                                         ▒Ի▒
                                                            ▒▒▒
                                                               John Smithjs12345SGP000000140381▒NPPL000002020234
▒Ի▒
  

Я пытаюсь сначала проанализировать данные на стандартном выходе, но в конце ожидается получение проанализированных данных в базе данных. Любые советы будут оценены.

Комментарии:

1. Не уверен, чего именно вы хотите достичь. Кажется, вы хотите написать потребительское приложение на python? Вам нужно будет настроить правильные десериализаторы для этого случая: docs.confluent.io/current/clients /… и docs.confluent.io/current/clients/confluent-kafka-python /…

Ответ №1:

Похоже, что ваши сообщения закодированы в двоичном формате. Чтобы распечатать их, вам нужно будет настроить двоичный декодер и передать их через него. В случае, если вы создали их с использованием определенной схемы, вам также может потребоваться десериализовать объекты, используя реестр схем, который содержит схему для данной темы. Вы смотрите на что-то в строках:

 message_bytes = io.BytesIO(message.value())
decoder = BinaryDecoder(message_bytes)
  

Комментарии:

1. Я добавил декодер в свой код, но при печати отображается приведенное ниже. <_io.BytesIO object at 0x7fcb4aa5ba10>

2. Это очень сильно зависит от формата созданных вами сообщений. Вы использовали схему? Как насчет message.value().decode('ascii'))

Ответ №2:

Как упоминал Джайвалис, похоже, существует несоответствие между вашими производителями и потребителем, которого вы используете для приема данных. Kafka Streams предоставляет два свойства для управления сериализацией и десериализацией данных, которые проходят через топологию; default.value.serde, default.key.serde. Я рекомендую просмотреть конфигурацию вашего приложения streams, чтобы найти подходящий десериализатор для использования потребителем.

https://kafka.apache.org/documentation/#streamsconfigs

Однако обратите внимание, что эти serd могут быть перезаписаны реализацией приложения your streams. Не забудьте также просмотреть свою реализацию, чтобы убедиться, что вы нашли правильный формат сериализации.

https://kafka.apache.org/21/documentation/streams/developer-guide/datatypes.html#overriding-default-serdes

Комментарии:

1. Спасибо за информацию. Поток kafka не поддерживается нашей командой, но я постараюсь получить детали схемы. Однако есть ли какой-либо способ получить его от самого потребителя?

2. Данные сериализации не включены в формат сообщений Kafka, поэтому их нет. Однако это не означает, что вы не можете обернуть полезную нагрузку сообщения своей собственной идентификационной информацией. Так работает реестр схем; docs.confluent.io/current/schema-registry /…