Не получает сообщений в очереди ответов, отправленных в RabbitMQ с помощью amqplib и обработанных NestJS

#node.js #rabbitmq #nestjs

Вопрос:

Поэтому я использую NestJS (v8) с транспортом RabbitMQ ( Transport.RMQ ) для прослушивания сообщений

Мой код NestJS выглядит примерно так:

 // main.ts

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'my-queue',
    replyQueue: 'my-reply-queue'
  },
});
 
 // my.controller.ts

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class MyController {
  @MessagePattern('something')
  do(data: {source: string}): {source: string} {
    console.log(data);

    data.source  = ' | MyController';

    return data;
  }
}
 

И в Узле.Приложение JS, которое я использую amqplib для отправки в приложение NestJS и получения ответа

это код узла.Приложение JS:

 const queueName = 'my-queue';
const replyQueueName = 'my-reply-queue';

const amqplib = require('amqplib');

async function run() {
  const conn = await amqplib.connect('amqp://localhost:5672');
  const channel = await conn.createChannel();

  await channel.assertQueue(queueName);
  await channel.assertQueue(replyQueueName);

  // Consumer: Listen to messages from the reply queue
  await channel.consume(replyQueueName, (msg) => console.log(msg.content.toString()));

  // Publisher: Send message to the queue
  channel.sendToQueue(
    queueName,
    Buffer.from(
      JSON.stringify({
        pattern: 'something',
        data: { source: 'node-application' },
      })
    ),
    { replyTo: replyQueueName }
  ); 
}

run()

 

Когда я запускаю узел и Nest.JS приложения, Гнездо.JS получает сообщение от Узла.JS издатель, но Узел.Потребителю JS никогда не звонят с ответом

Ответ №1:

Исправление состояло в том, чтобы добавить id ключ в данные, которые содержит узел.Приложение JS отправляет:

 // ...

// Publisher: Send message to the queue
channel.sendToQueue(
  queueName,
  Buffer.from(
    JSON.stringify({
      // Add the `id` key here so the Node.js consumer will get the message in the reply queue
      id: '',
      
      pattern: 'something',
      data: { source: 'node-application' },
    })
  ),
  { replyTo: replyQueueName }
); 

// ...
 

Подробное объяснение (в Nest.JS исходный код)

Это связано с тем, что в handleMessage функции в server-rmq.ts файле есть проверка, является ли id свойство сообщения undefined

 // https://github.com/nestjs/nest/blob/026c1bd61c561a3ad24da425d6bca27d47567bfd/packages/microservices/server/server-rmq.ts#L139-L141

 public async handleMessage(
    message: Record<string, any>,
    channel: any,
  ): Promise<void> {
    // ...

    if (isUndefined((packet as IncomingRequest).id)) {
      return this.handleEvent(pattern, packet, rmqContext);
    }

    // ...
  }
 

И в handleEvent функции нет логики отправки сообщений в очередь ответов, просто обработка события