#typescript #microservices #nestjs #nestjs-config
Вопрос:
В Nestjs я использую кафку в качестве посредника сообщений и устанавливаю имя темы следующим образом:
@MessagePattern('topic-name')
async getNewRequest(@Payload() message: any): Promise<void> {
// my code goes here
}
Есть ли способ прочитать название темы кафки из модуля службы конфигурации?
Комментарии:
1. Привет…У меня та же проблема. Ты его починил?
Ответ №1:
Я справляюсь с этим, создавая новый пользовательский декоратор.
export function KafkaTopic(variable: string | keyof AppConfig): any {
return (
target: any,
key: string | symbol,
descriptor: PropertyDescriptor,
) => {
Reflect.defineMetadata(
KAFKA_TOPIC_METADATA,
variable,
descriptor.value,
);
return descriptor;
};
а затем динамически замените его на MessagePattern и задайте имя темы из AppConfig:
@Injectable()
export class KafkaDecoratorProcessorService {
constructor(
private readonly LOG: Logger,
private readonly appConfig: AppConfig,
) {
}
processKafkaDecorators(types: any[]) {
for (const type of types) {
const propNames = Object.getOwnPropertyNames(type.prototype);
for (const prop of propNames) {
const propValue = Reflect.getMetadata(
KAFKA_TOPIC_METADATA,
Reflect.get(type.prototype, prop),
);
if (propValue) {
const topic = this.appConfig[propValue];
this.LOG.log(`Setting topic ${topic} for ${type.name}#${prop}`);
Reflect.decorate(
[MessagePattern(topic)],
type.prototype,
prop,
Reflect.getOwnPropertyDescriptor(type.prototype, prop),
);
}
}
}
}
}
Вот как запустить processKafkaDecorators в файле main.ts:
const app = await NestFactory.create(AppModule);
app
.get(KafkaDecoratorProcessorService)
.processKafkaDecorators([AppController]);
app.connectMicroservice({
transport: Transport.KAFKA,
...
})
обратите внимание, что вы должны запустить его перед подключением микросервиса.
И используйте его вот так:
@KafkaTopic('KAFKA_TOPIC_BOOK_UPDATE')
async processMessage(
@Payload() { value: payload }: { value: BookUpdateModel },
) {
...
}