требуют ли потоки ksqldb, чтобы обе схемы имели одинаковое количество полей?

#apache-kafka #ksqldb

#apache-kafka #ksqldb

Вопрос:

извините, если это ясно из документации, но я этого не нашел. В настоящее время я следую этому руководству и адаптирую его к своим потребностям, и я сталкиваюсь с некоторыми проблемами, и мне интересно, будет ли это правильным для меня.

https://medium.com/@tomershaiman/tutorial-building-a-s3-parquet-datalake-without-a-single-line-of-code-5ea4455edc1e То, что я хочу сделать, очень похоже на то, что представьте ситуацию, которая у меня есть.

TL / DR: У меня есть данные в Kafka в разделе в формате Protobuf, я хочу создать поток, который преобразует этот protobuf в новую тему в формате Avro, и как только он будет в формате avro, у меня будет соединитель, который будет использовать его и сбрасывать в корзину s3.

Теперь представьте, что у меня есть тема Kafka SearchRequest_proto в формате protobuf, затем я хочу создать тему SearchRequest_avro в формате avro

например, мой protobuf был бы

 syntax = "proto3";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}
  

мой первый вопрос заключается в том, требует ли мой поток SearchRequest ksql всех полей или я могу сделать что-то вроде этого?

 CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT, result_per_page INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');
  

Или я могу сделать что-то вроде этого:

 CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');
  

Мой вопрос возникает потому, что у меня есть немного более сложный прототип, и я пытаюсь протестировать только некоторые поля, чтобы не нужно было выполнять их все, и когда я создаю свой второй поток из первого, кажется, ничего не выходит.

 CREATE STREAM SearchRequest_avro WITH (KAFKA_TOPIC='SearchRequest_avro', REPLICAS=1, PARTITIONS=1, VALUE_FORMAT='AVRO') AS SELECT * FROM SearchRequest_proto;
  

Также впоследствии, если я перейду в свои группы потребителей kafka, я смогу увидеть второй поток, зарегистрированный как потребитель в kafka.
Моя первая тема с protobuf содержит сообщения, но почему-то я даже не могу использовать печать в своих темах, теперь, когда все показано, я получаю это сообщение:

 ksql> show streams;

 Stream Name   | Kafka Topic                 | Format   
--------------------------------------------------------
 OBJ_POS_AVRO  | com.obj_pos_avro            | AVRO     
 OBJ_POS_PROTO | com.obj_pos_proto           | PROTOBUF 
--------------------------------------------------------
ksql> print "com.obj_pos_proto";
Could not find topic 'com.obj_pos_proto', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
ksql> print "com.obj_pos_avro";
Could not find topic 'com.obj_pos_avro', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
  

Мой вопрос возникает потому, что, поскольку я вижу, что потребитель зарегистрирован, но без какого-либо смещения, мне интересно, потому что я неявно не объявлял все поля в моем protobuf как часть потока, если из-за этого происходит сбой? Или, может быть, это что-то другое.

Также дополнительный момент, если кто-нибудь знает, я немного поискал в Google, но не нашел, могу ли я каким-либо образом использовать реестр schema для регистрации моих protobufs, чтобы потоки автоматически могли читать его без необходимости указывать все эти поля?

Или, может быть, любая библиотека, которая может использовать protobufs и генерировать файлы формата stream или avro?

спасибо за любые отзывы, извините за длинный пост, также, как вы можете себе представить, я определенно не очень разбираюсь в этих темах kafka, поэтому для меня это что-то новое

РЕДАКТИРОВАТЬ: я провел быстрый тест самостоятельно, и действительно, он поддерживает просто меньшее количество полей, так что это не будет проблемой. Однако я получаю ошибку с сериализацией, которая, должно быть, связана с некоторой проблемой конфигурации на моей стороне:

 [2020-09-21 08:19:32,836] INFO KafkaProtobufDeserializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://confluent-schema-registry-svc:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    specific.protobuf.value.type = class java.lang.Object
    use.latest.version = false
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    derive.type = false
    specific.protobuf.key.type = class java.lang.Object
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig)
[2020-09-21 08:19:32,836] INFO ProtobufDataConfig values: 
    schemas.cache.config = 1000
    enhanced.protobuf.schema.support = false
 (io.confluent.connect.protobuf.ProtobufDataConfig)
[2020-09-21 08:19:32,841] INFO JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
 (org.apache.kafka.connect.json.JsonConverterConfig)
[2020-09-21 08:19:32,844] ERROR {"type":0,"deserializationError":{"errorMessage":"Error deserializing message from topic: com.obj_pos_proto","recordB64":null,"cause":["Failed to deserialize data for topic com.obj_pos_proto to Protobuf: ","Error deserializing Protobuf message for id -1","Unknown magic byte!"]},"recordProcessingError":null,"productionError":null} (processing.CSAS_TARGET_AVRO_3.KsqlTopic.Source.deserializer)
[2020-09-21 08:19:32,845] WARN Exception caught during Deserialization, taskId: 0_2, topic: com.obj_pos_proto, partition: 2, offset: 0 (org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: com.obj_pos_proto
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos_proto to Protobuf: 
    at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
    at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
    at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:300)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:285)
    at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:46)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
  

Ответ №1:

Я полагаю, что нашел причину, по которой у меня возникла эта проблема.

Что я хотел сделать, так это создать поток без ключаhttps://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/#example

 -- keyless stream, with value columns loaded from Schema Registry:
CREATE STREAM pageviews WITH (
    KAFKA_TOPIC = 'keyless-pageviews-topic',
    VALUE_FORMAT = 'JSON'
  );
  

причина, по которой это не удалось, заключалась в том, что мой производитель не связывался с моим реестром схемы, поэтому, когда я пытался десериализовать данные, это всегда приводило к сбою, потому что реестр на самом деле не работал должным образом