Узел: Отправка сообщения Protobuf Кафке об ошибке

#node.js #apache-kafka #protocol-buffers #apache-kafka-connect

Вопрос:

Я пытаюсь использовать разъем HDFS kafka для отправки сообщений protobuf от кафки в HDFS. Конфигурация моего соединителя выглядит следующим образом

 {
    "name": "hdfs3-connector-test",
    "config": {
        "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
        "tasks.max": "1",
        "topics": "test-topic",
        "hdfs.url": "hdfs://10.8.0.1:9000",
        "flush.size": "3",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://10.8.0.1:8081",
        "confluent.topic.bootstrap.servers": "10.8.0.1:9092",
        "confluent.topic.replication.factor": "1"
    }
}
 

Чтобы проверить это, я пытаюсь отправлять сериализованные сообщения protobuf в приложении с небольшим узлом. Вот мои файлы:

 // data.proto

syntax = "proto3";
package awesomepackage;

message SearchRequest {
  string query = 1;
  int32 page = 2;
}
 

и мое приложение node

 const { Kafka } = require('kafkajs')
const protobuf = require('protobufjs')

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['10.8.0.1:9092']
})

const producer = kafka.producer()


const run = async () => {
    await producer.connect()

    protobuf.load('data.proto', async (err, root) => {
        console.log("TESTING")
        console.log(err)
    
        let SearchRequest = root.lookupType('awesomepackage.SearchRequest')
        let payload = {query: "test", page: 2} 
    
        var errMsg = SearchRequest.verify(payload);
        console.log(errMsg)
    
        let msg = SearchRequest.create(payload)
        var buffer = SearchRequest.encode(msg).finish();
        console.log(buffer)
        await producer.send({
            topic: 'test-topic',
            messages: [
                {key: 'key1', value: buffer}
            ]
        })
    })
    
}

run()
 

Однако, когда я запускаю это, я получаю следующие ошибки:

 Failed to deserialize data for topic test-topic to Protobuf

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
 

Как мне это исправить? Я предполагаю, что моя схема protobuf не зарегистрирована в реестре схем кафки, но я не уверен. Если это так, есть ли способ отправить схему в реестр с узла?

Ответ №1:

io.confluent.connect.protobuf.ProtobufConverter требуется реестр схемы, а не обычный сериализованный прототип. Другими словами, в коде узла отсутствует часть реестра схемы (или байтовое создание «обернутого» протосообщения вручную)

Обратитесь к Формату Провода — Слияние


Если вы не хотите использовать реестр схем, вы можете использовать конвертер BlueApron Protobuf, но, похоже, вы его используете, поэтому лучше всего использовать конвертер Confluent

Комментарии:

1. Значит ли это, что мне нужно сначала зарегистрировать схему с помощью API реестра схем, а затем получить идентификатор этой схемы и добавить его в мое сообщение protobuf вместе с волшебным байтом?

2. Это один из вариантов, если только нет библиотеки Кафки узла, которая включает в себя прото-сериализатор, такой как Avro

3. Однако прочитайте комментарий в документах об «индексах сообщений», однако

4. Ах, кажется, есть потенциальная библиотека, которая может справиться с этим: kafkajs.github.io/confluent-schema-registry/docs/usage .

5. Да, поблизости есть несколько библиотек. Я еще не был уверен, какой из них реализовал функциональность, отличную от Avro