#node.js #apache-kafka #protocol-buffers #apache-kafka-connect
Вопрос:
Я пытаюсь использовать разъем HDFS kafka для отправки сообщений protobuf от кафки в HDFS. Конфигурация моего соединителя выглядит следующим образом
{
"name": "hdfs3-connector-test",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1"
}
}
Чтобы проверить это, я пытаюсь отправлять сериализованные сообщения protobuf в приложении с небольшим узлом. Вот мои файлы:
// data.proto
syntax = "proto3";
package awesomepackage;
message SearchRequest {
string query = 1;
int32 page = 2;
}
и мое приложение node
const { Kafka } = require('kafkajs')
const protobuf = require('protobufjs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['10.8.0.1:9092']
})
const producer = kafka.producer()
const run = async () => {
await producer.connect()
protobuf.load('data.proto', async (err, root) => {
console.log("TESTING")
console.log(err)
let SearchRequest = root.lookupType('awesomepackage.SearchRequest')
let payload = {query: "test", page: 2}
var errMsg = SearchRequest.verify(payload);
console.log(errMsg)
let msg = SearchRequest.create(payload)
var buffer = SearchRequest.encode(msg).finish();
console.log(buffer)
await producer.send({
topic: 'test-topic',
messages: [
{key: 'key1', value: buffer}
]
})
})
}
run()
Однако, когда я запускаю это, я получаю следующие ошибки:
Failed to deserialize data for topic test-topic to Protobuf
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Как мне это исправить? Я предполагаю, что моя схема protobuf не зарегистрирована в реестре схем кафки, но я не уверен. Если это так, есть ли способ отправить схему в реестр с узла?
Ответ №1:
io.confluent.connect.protobuf.ProtobufConverter
требуется реестр схемы, а не обычный сериализованный прототип. Другими словами, в коде узла отсутствует часть реестра схемы (или байтовое создание «обернутого» протосообщения вручную)
Обратитесь к Формату Провода — Слияние
Если вы не хотите использовать реестр схем, вы можете использовать конвертер BlueApron Protobuf, но, похоже, вы его используете, поэтому лучше всего использовать конвертер Confluent
Комментарии:
1. Значит ли это, что мне нужно сначала зарегистрировать схему с помощью API реестра схем, а затем получить идентификатор этой схемы и добавить его в мое сообщение protobuf вместе с волшебным байтом?
2. Это один из вариантов, если только нет библиотеки Кафки узла, которая включает в себя прото-сериализатор, такой как Avro
3. Однако прочитайте комментарий в документах об «индексах сообщений», однако
4. Ах, кажется, есть потенциальная библиотека, которая может справиться с этим: kafkajs.github.io/confluent-schema-registry/docs/usage .
5. Да, поблизости есть несколько библиотек. Я еще не был уверен, какой из них реализовал функциональность, отличную от Avro