Как мы можем настроить value.subject.name.strategy для схем в nodejs Kafka producers?

#node.js #apache-kafka #confluent-schema-registry

#node.js #апачи-кафка #confluent-schema-реестр

Вопрос:

Как мы можем настроить стратегию именования объектов в схеме avro, я хочу настроить key.subject.name.strategy и value.subject.name.strategy. Я знаю, что это можно было бы сделать на Java, но не смог найти способ сделать это в node. До сих пор я работал с приведенным ниже кодом.

 function getSchemaRegistryClient() {
        let saslUsername = ACCESS_KEY;
        let saslPassword = SECRET_KEY;
        let schemaRegistryUrl = SCHEMA_REGISTRY_URL;

        var registryClient = new SchemaRegistry({
            host: `https://${saslUsername}:${saslPassword}@${schemaRegistryUrl}`
        });

    return registryClient;
}

function getKafkaClient() {
  console.log('Fetching Kafka Client');

  var kafkaClient = new Kafka({
    brokers: ['url'],
    sasl: {
      mechanism: 'SCRAM-SHA-512',
      username: USERNAME,
      password: PASSWORD
    },
  });
  return kafkaClient;
}

function getKafkaProducerClient() {
      console.log('Trying to fetch kafka producer');

      var producer = getKafkaClient().producer();
      return producer;
}

async function publishToProton(eventToPublish) {

    console.log(`Event before publishing ${JSON.stringify(event)}`); 
    
    var registryClient = getSchemaRegistryClient();

    var encodedMessage = await registryClient.encode(registryId, event);
    let sendParams = {
        topic: topicName,
        messages: [
            {
                key: 'EventKey',
                value: encodedMessage
            }
        ]
    };

    let producer = getKafkaProducerClient();

    await producer.connect();
    await producer.send(sendParams);
    await producer.disconnect();
    console.log(`Proton event is published`);
};

try {
    publishToProton(event);
}catch(e){
    console.log(e);
}
 

Комментарии:

1. Я использую библиотеки @kafkajs/ confluent-schema-registry и kafkajs.

2. Вероятно, для этого лучше обратиться к проблеме github. github.com/kafkajs/confluent-schema-registry/issues/71