Kafka Schemaregistry Protobuf неподдерживаемая корневая схема типа STRING

#apache-kafka #protocol-buffers #confluent-schema-registry

#apache-kafka #буферы протокола #confluent-schema-registry

Вопрос:

Я использую Kafka Connect с соединителем Google PubSub для записи сообщений из gcp PubSub в темы Kafka.

Мой соединитель имеет следующую конфигурацию:

 {
    "name": "MyTopicSourceConnector",
    "config": {
        "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
        "tasks.max": "10",
        "kafka.topic": "myTopic",
        "cps.project": "my-project-id",
        "cps.subscription": "myTopic-sub",
        "name": "MyTopicSourceConnector",
        "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "key.converter.schema.registry.url": "http://myurl-schema-registry:8081",
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url": "http://myurl-schema-registry:8081"
    }
}
  

Схема значений протосообщения выглядит следующим образом:

 syntax = "proto3";

message value_myTopic {
    bytes message = 1;
    string notificationConfig = 2;
    string eventTime = 3;
    string bucketId = 4;
    string payloadFormat = 5;
    string eventType = 6;
    string objectId = 7;
    string objectGeneration = 8;
}
  

Эта настройка работает, когда я использую avro или json (с соответствующими преобразователями), но с Protobuf мой соединитель выдает следующее сообщение об ошибке сразу после его развертывания и завершается сбоем:

 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:292)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Unsupported root schema of type STRING
        at io.confluent.connect.protobuf.ProtobufData.rawSchemaFromConnectSchema(ProtobufData.java:315)
        at io.confluent.connect.protobuf.ProtobufData.fromConnectSchema(ProtobufData.java:304)
        at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:109)
        at io.confluent.connect.protobuf.ProtobufConverter.fromConnectData(ProtobufConverter.java:83)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:292)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more
  

Ответ №1:

Я не знаю, почему у вас возникла эта проблема. В случае, если вам интересно, вместо конвертера Confluent Protobuf существует конвертер сообщества от Blue Apron, и вам просто необходимо добавить другую конфигурацию, чтобы указать, какой класс сериализации Protobuf (de) использовать.

Пример из Snowflake из 2 covnerters можно увидеть здесь.