#node.js #apache-kafka #kafka-node
#node.js #apache-kafka #kafka-узел
Вопрос:
Я пытаюсь использовать kafka-node-avro
библиотека для отправки данных в существующую тему. Я уже добавил схему в SchemaRegistry с помощью curl, а также тему. Я получаю следующую ошибку:
Error: Topic name or id is needed to build Schema
at new Shema (/work/node_modules/kafka-node-avro/lib/schema.js:7:41)
at Object.Pool.getByName (/work/node_modules/kafka-node-avro/lib/schemaPool.js:70:10)
at new Promise (<anonymous>)
Мой фрагмент кода следующий:
const Settings = {
"kafka" : {
"kafkaHost" : config.KafkaHost
},
"schema": {
"registry" : config.KafkaRegistry
}
};
console.log("settings registry: ", config.KafkaRegistry);
console.log("settings kafkaHost: ", config.KafkaHost)
KafkaAvro.init(Settings).then( kafka => {
const producer = kafka.addProducer();
let payloads = [
{
topic: 'classifier-response-test',
messages: JSON.stringify(kafkaData)
}
];
producer.send(payloads).then( success => {
// Message was sent encoded with Avro Schema
console.log("message sent ! Awesome ! :) ")
}, error => {
// Something wrong happen
console.log("There seems that there is a mistake ! try Again ;) ")
console.log(error)
});
} , error => {
// something wrong happen
console.log("There seems that there is a global mistake ! try Again ;) ")
console.log(error)
});
Ответ №1:
Библиотека не готова к send
получению списка сообщений, причина, по которой она жалуется при попытке динамического добавления схемы в пул схем в механизме отправки. Самое простое решение — отправлять по 1 сообщению за раз, в вашем примере кода это может быть похоже
const Settings = {
"kafka" : {
"kafkaHost" : config.KafkaHost
},
"schema": {
"registry" : config.KafkaRegistry
}
};
KafkaAvro.init(Settings).then( kafka => {
kafka.send({
topic: 'classifier-response-test',
messages: JSON.stringify(kafkaData)
}).then( success => {
// Message was sent encoded with Avro Schema
console.log("message sent ! Awesome ! :) ")
}, error => {
// Something wrong happen
console.log("There seems that there is a mistake ! try Again ;) ")
console.log(error)
});
} , error => {
// something wrong happen
console.log("There seems that there is a global mistake ! try Again ;) ")
console.log(error)
});
Спасибо за использование kafka-node-avro
Ответ №2:
Проблема заключалась в том, что мы хотели поместить список тем в настройки схемы, чтобы установить связь между темой и реестром схемы. мы можем указать идентификатор схемы или название темы.
const kafkaSettings = {
"kafka" : {
"kafkaHost" : config.KafkaHost
},
"schema": {
"registry" : config.KafkaRegistry,
"topics": [
{"name": "classifier-response-test" }
]
}
};
Комментарии:
1. если вы добавите подобные темы, вы получите сообщение об ошибке подключения.