отправить сообщение avro в тему kafka

#node.js #apache-kafka #kafka-node

#node.js #apache-kafka #kafka-узел

Вопрос:

Я пытаюсь использовать kafka-node-avro

библиотека для отправки данных в существующую тему. Я уже добавил схему в SchemaRegistry с помощью curl, а также тему. Я получаю следующую ошибку:

 Error: Topic name or id is needed to build Schema
    at new Shema (/work/node_modules/kafka-node-avro/lib/schema.js:7:41)
    at Object.Pool.getByName (/work/node_modules/kafka-node-avro/lib/schemaPool.js:70:10)
    at new Promise (<anonymous>)
  

Мой фрагмент кода следующий:

 const Settings = {
    "kafka" : {
        "kafkaHost" : config.KafkaHost
    },
    "schema": {
        "registry" : config.KafkaRegistry
    }
};

console.log("settings registry: ", config.KafkaRegistry);
console.log("settings kafkaHost: ", config.KafkaHost)
KafkaAvro.init(Settings).then( kafka => {
const producer = kafka.addProducer();
let payloads = [
    {
        topic: 'classifier-response-test',
        messages: JSON.stringify(kafkaData)
    }
];
producer.send(payloads).then( success => {
// Message was sent encoded with Avro Schema
    console.log("message sent ! Awesome ! :) ")
}, error => {
        // Something wrong happen
        console.log("There seems that there is a mistake ! try Again ;) ")
        console.log(error)
    });
} , error => {
    // something wrong happen
    console.log("There seems that there is a global mistake ! try Again ;) ")
    console.log(error)
});
  

Ответ №1:

Библиотека не готова к send получению списка сообщений, причина, по которой она жалуется при попытке динамического добавления схемы в пул схем в механизме отправки. Самое простое решение — отправлять по 1 сообщению за раз, в вашем примере кода это может быть похоже

 const Settings = {
  "kafka" : {
    "kafkaHost" : config.KafkaHost
  },
  "schema": {
    "registry" : config.KafkaRegistry
  }
};

KafkaAvro.init(Settings).then( kafka => {
  kafka.send({
    topic: 'classifier-response-test',
    messages: JSON.stringify(kafkaData)
  }).then( success => {
    // Message was sent encoded with Avro Schema
    console.log("message sent ! Awesome ! :) ")
  }, error => {
    // Something wrong happen
    console.log("There seems that there is a mistake ! try Again ;) ")
    console.log(error)
  });
} , error => {
  // something wrong happen
  console.log("There seems that there is a global mistake ! try Again ;) ")
  console.log(error)
});
  

Спасибо за использование kafka-node-avro

Ответ №2:

Проблема заключалась в том, что мы хотели поместить список тем в настройки схемы, чтобы установить связь между темой и реестром схемы. мы можем указать идентификатор схемы или название темы.

 const kafkaSettings  = {
    "kafka" : {
      "kafkaHost" : config.KafkaHost
    },
    "schema": {
      "registry" : config.KafkaRegistry,
      "topics": [
        {"name": "classifier-response-test" }
    ]
    }
  };
  

Комментарии:

1. если вы добавите подобные темы, вы получите сообщение об ошибке подключения.