#apache-kafka #ksqldb
#apache-kafka #ksqldb
Вопрос:
извините, если это ясно из документации, но я этого не нашел. В настоящее время я следую этому руководству и адаптирую его к своим потребностям, и я сталкиваюсь с некоторыми проблемами, и мне интересно, будет ли это правильным для меня.
https://medium.com/@tomershaiman/tutorial-building-a-s3-parquet-datalake-without-a-single-line-of-code-5ea4455edc1e То, что я хочу сделать, очень похоже на то, что представьте ситуацию, которая у меня есть.
TL / DR: У меня есть данные в Kafka в разделе в формате Protobuf, я хочу создать поток, который преобразует этот protobuf в новую тему в формате Avro, и как только он будет в формате avro, у меня будет соединитель, который будет использовать его и сбрасывать в корзину s3.
Теперь представьте, что у меня есть тема Kafka SearchRequest_proto в формате protobuf, затем я хочу создать тему SearchRequest_avro в формате avro
например, мой protobuf был бы
syntax = "proto3";
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
}
мой первый вопрос заключается в том, требует ли мой поток SearchRequest ksql всех полей или я могу сделать что-то вроде этого?
CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT, result_per_page INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');
Или я могу сделать что-то вроде этого:
CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');
Мой вопрос возникает потому, что у меня есть немного более сложный прототип, и я пытаюсь протестировать только некоторые поля, чтобы не нужно было выполнять их все, и когда я создаю свой второй поток из первого, кажется, ничего не выходит.
CREATE STREAM SearchRequest_avro WITH (KAFKA_TOPIC='SearchRequest_avro', REPLICAS=1, PARTITIONS=1, VALUE_FORMAT='AVRO') AS SELECT * FROM SearchRequest_proto;
Также впоследствии, если я перейду в свои группы потребителей kafka, я смогу увидеть второй поток, зарегистрированный как потребитель в kafka.
Моя первая тема с protobuf содержит сообщения, но почему-то я даже не могу использовать печать в своих темах, теперь, когда все показано, я получаю это сообщение:
ksql> show streams;
Stream Name | Kafka Topic | Format
--------------------------------------------------------
OBJ_POS_AVRO | com.obj_pos_avro | AVRO
OBJ_POS_PROTO | com.obj_pos_proto | PROTOBUF
--------------------------------------------------------
ksql> print "com.obj_pos_proto";
Could not find topic 'com.obj_pos_proto', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
ksql> print "com.obj_pos_avro";
Could not find topic 'com.obj_pos_avro', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
Мой вопрос возникает потому, что, поскольку я вижу, что потребитель зарегистрирован, но без какого-либо смещения, мне интересно, потому что я неявно не объявлял все поля в моем protobuf как часть потока, если из-за этого происходит сбой? Или, может быть, это что-то другое.
Также дополнительный момент, если кто-нибудь знает, я немного поискал в Google, но не нашел, могу ли я каким-либо образом использовать реестр schema для регистрации моих protobufs, чтобы потоки автоматически могли читать его без необходимости указывать все эти поля?
Или, может быть, любая библиотека, которая может использовать protobufs и генерировать файлы формата stream или avro?
спасибо за любые отзывы, извините за длинный пост, также, как вы можете себе представить, я определенно не очень разбираюсь в этих темах kafka, поэтому для меня это что-то новое
РЕДАКТИРОВАТЬ: я провел быстрый тест самостоятельно, и действительно, он поддерживает просто меньшее количество полей, так что это не будет проблемой. Однако я получаю ошибку с сериализацией, которая, должно быть, связана с некоторой проблемой конфигурации на моей стороне:
[2020-09-21 08:19:32,836] INFO KafkaProtobufDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://confluent-schema-registry-svc:8081]
basic.auth.user.info = [hidden]
proxy.host =
specific.protobuf.value.type = class java.lang.Object
use.latest.version = false
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
derive.type = false
specific.protobuf.key.type = class java.lang.Object
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig)
[2020-09-21 08:19:32,836] INFO ProtobufDataConfig values:
schemas.cache.config = 1000
enhanced.protobuf.schema.support = false
(io.confluent.connect.protobuf.ProtobufDataConfig)
[2020-09-21 08:19:32,841] INFO JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
(org.apache.kafka.connect.json.JsonConverterConfig)
[2020-09-21 08:19:32,844] ERROR {"type":0,"deserializationError":{"errorMessage":"Error deserializing message from topic: com.obj_pos_proto","recordB64":null,"cause":["Failed to deserialize data for topic com.obj_pos_proto to Protobuf: ","Error deserializing Protobuf message for id -1","Unknown magic byte!"]},"recordProcessingError":null,"productionError":null} (processing.CSAS_TARGET_AVRO_3.KsqlTopic.Source.deserializer)
[2020-09-21 08:19:32,845] WARN Exception caught during Deserialization, taskId: 0_2, topic: com.obj_pos_proto, partition: 2, offset: 0 (org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: com.obj_pos_proto
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos_proto to Protobuf:
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:300)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:285)
at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:46)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Ответ №1:
Я полагаю, что нашел причину, по которой у меня возникла эта проблема.
Что я хотел сделать, так это создать поток без ключаhttps://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/#example
-- keyless stream, with value columns loaded from Schema Registry:
CREATE STREAM pageviews WITH (
KAFKA_TOPIC = 'keyless-pageviews-topic',
VALUE_FORMAT = 'JSON'
);
причина, по которой это не удалось, заключалась в том, что мой производитель не связывался с моим реестром схемы, поэтому, когда я пытался десериализовать данные, это всегда приводило к сбою, потому что реестр на самом деле не работал должным образом