Динамическое имя темы кафки в микросервисе nestjs

#typescript #microservices #nestjs #nestjs-config

Вопрос:

В Nestjs я использую кафку в качестве посредника сообщений и устанавливаю имя темы следующим образом:

 @MessagePattern('topic-name')
async getNewRequest(@Payload() message: any): Promise<void> {
  // my code goes here
}
 

Есть ли способ прочитать название темы кафки из модуля службы конфигурации?

Комментарии:

1. Привет…У меня та же проблема. Ты его починил?

Ответ №1:

Я справляюсь с этим, создавая новый пользовательский декоратор.

 export function KafkaTopic(variable: string | keyof AppConfig): any {
  return (
    target: any,
    key: string | symbol,
    descriptor: PropertyDescriptor,
  ) => {
    Reflect.defineMetadata(
      KAFKA_TOPIC_METADATA,
      variable,
      descriptor.value,
    );
    return descriptor;
  };
 

а затем динамически замените его на MessagePattern и задайте имя темы из AppConfig:

 @Injectable()
export class KafkaDecoratorProcessorService {
  constructor(
    private readonly LOG: Logger,
    private readonly appConfig: AppConfig,
  ) {
  }

  processKafkaDecorators(types: any[]) {
    for (const type of types) {
      const propNames = Object.getOwnPropertyNames(type.prototype);
      for (const prop of propNames) {
        const propValue = Reflect.getMetadata(
          KAFKA_TOPIC_METADATA,
          Reflect.get(type.prototype, prop),
        );

        if (propValue) {
          const topic = this.appConfig[propValue];
          this.LOG.log(`Setting topic ${topic} for ${type.name}#${prop}`);
          Reflect.decorate(
            [MessagePattern(topic)],
            type.prototype,
            prop,
            Reflect.getOwnPropertyDescriptor(type.prototype, prop),
          );
        }
      }
    }
  }
}
 

Вот как запустить processKafkaDecorators в файле main.ts:

 const app = await NestFactory.create(AppModule);
  app
    .get(KafkaDecoratorProcessorService)
    .processKafkaDecorators([AppController]);

  app.connectMicroservice({
    transport: Transport.KAFKA,
    ...
   })
 

обратите внимание, что вы должны запустить его перед подключением микросервиса.
И используйте его вот так:

 @KafkaTopic('KAFKA_TOPIC_BOOK_UPDATE')
  async processMessage(
    @Payload() { value: payload }: { value: BookUpdateModel },
  ) {
    ...
  }
 

Источник