#apache-kafka
#apache-kafka
Вопрос:
Я пытаюсь настроить потребителя kafka для обработки данных из потоков Kafka. Я смог настроить подключение к потоку, и данные видны, но это смесь специальных символов и ASCII.
Я использую встроенную консоль kafka, но также попробовал версию confluent-kafka на python. Единственные параметры, которые необходимо соблюдать, — это использовать протокол безопасности SASL_PLAINTEXT с SCRAM-SHA-256. Я открыт для использования других методов для анализа выходных данных (по возможности, не Java).
Консоль Kafka
bin/kafka-console-consumer.sh --bootstrap-server server:9092
--topic TOPIC --from-beginning --consumer.config=consumer.properties
Confluent Kafka Python
topics = "TOPIC"
conf = {
"bootstrap.servers": "server:9092",
"group.id": "group",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms" : "SCRAM-SHA-256",
}
c = Consumer(conf)
c.subscribe([topics])
running = True
while running:
message = c.poll()
print(message.value())
c.close()
Вывод
PLE9K1PKH3S0MAY38ChangeRequest : llZYMEgVmq2CHG:Infra RequestKSUSMAINCHANGEKC-10200-FL01DATA_MISSINGCHGUSD
DATA_MISSINGDATA_MISSINGUSD
CANCEL
▒▒12SLM:Measurement"Schedule(1 = 0)USDUSD▒▒▒
l▒l▒V?▒▒▒
llZYMEgVmq
company_team team_nameTEAM###SGP000000140381PPL000002020234
Latha M▒>▒>▒ChangeRequest
hello:1234543534 cloud abcdef▒▒▒
▒Ի▒
▒▒▒
John Smithjs12345SGP000000140381▒NPPL000002020234
▒Ի▒
Я пытаюсь сначала проанализировать данные на стандартном выходе, но в конце ожидается получение проанализированных данных в базе данных. Любые советы будут оценены.
Комментарии:
1. Не уверен, чего именно вы хотите достичь. Кажется, вы хотите написать потребительское приложение на python? Вам нужно будет настроить правильные десериализаторы для этого случая: docs.confluent.io/current/clients /… и docs.confluent.io/current/clients/confluent-kafka-python /…
Ответ №1:
Похоже, что ваши сообщения закодированы в двоичном формате. Чтобы распечатать их, вам нужно будет настроить двоичный декодер и передать их через него. В случае, если вы создали их с использованием определенной схемы, вам также может потребоваться десериализовать объекты, используя реестр схем, который содержит схему для данной темы. Вы смотрите на что-то в строках:
message_bytes = io.BytesIO(message.value())
decoder = BinaryDecoder(message_bytes)
Комментарии:
1. Я добавил декодер в свой код, но при печати отображается приведенное ниже.
<_io.BytesIO object at 0x7fcb4aa5ba10>
2. Это очень сильно зависит от формата созданных вами сообщений. Вы использовали схему? Как насчет
message.value().decode('ascii'))
Ответ №2:
Как упоминал Джайвалис, похоже, существует несоответствие между вашими производителями и потребителем, которого вы используете для приема данных. Kafka Streams предоставляет два свойства для управления сериализацией и десериализацией данных, которые проходят через топологию; default.value.serde, default.key.serde. Я рекомендую просмотреть конфигурацию вашего приложения streams, чтобы найти подходящий десериализатор для использования потребителем.
https://kafka.apache.org/documentation/#streamsconfigs
Однако обратите внимание, что эти serd могут быть перезаписаны реализацией приложения your streams. Не забудьте также просмотреть свою реализацию, чтобы убедиться, что вы нашли правильный формат сериализации.
Комментарии:
1. Спасибо за информацию. Поток kafka не поддерживается нашей командой, но я постараюсь получить детали схемы. Однако есть ли какой-либо способ получить его от самого потребителя?
2. Данные сериализации не включены в формат сообщений Kafka, поэтому их нет. Однако это не означает, что вы не можете обернуть полезную нагрузку сообщения своей собственной идентификационной информацией. Так работает реестр схем; docs.confluent.io/current/schema-registry /…