Как реализовать rabbit mq или bull js в loopback 4

#loopbackjs #loopback4

#loopbackjs #loopback4

Вопрос:

Я пытаюсь реализовать фоновую службу, чтобы уменьшить нагрузку на вызов API. Запустится фоновая задача для загрузки файлов на S3 и отправки электронной почты с помощью nodemailer.

Ответ №1:

Следуя шаблонам проектирования loopback 4, вы можете создать поставщика услуг и внедрить его в свой сервис или контроллер. Вот очень простой пример с Bull.js:

 import Bull, { Queue, Job } from "bull";
import {Provider, service} from "@loopback/core";
import {get} from "@loopback/rest";

export function audioProcessor(job: Job) {
  console.log(
    `Processing audio file: ${job.data.filename}`,
    `Audio bitrate: ${job.data.bitrate}`
  );
}

export class AudioQueueProvider implements Provider<Queue> {
  async value() {
    const queue = new Bull("AudioQueue", {
      redis: { port: 6379, host: "127.0.0.1" }
    });

    queue.process(audioProcessor);
    
    return queue;
  }
}

export class AudioController {
  constructor(
    @service(AudioQueueProvider) public queue: Queue;
  ) {}

  @get('/process-audio')
  async addToQueue(): Promise<string> {
    await this.queue.add(
      {
        filename: 'process_me.wav',
        bitrate: 320,
      }
    );
    return 'Audio file added to the AudioQueue for processing';
  }
}

  

Реализация RabbitMQ должна быть аналогичной (не тестировалась):

 import { Provider, service } from "@loopback/core";
import {get} from "@loopback/rest";
import amqp from "amqplib/callback_api";

export function audioProcessor(msg: any) {
  console.log(
    `Processing audio file: ${msg.content.filename}`,
    `Audio bitrate: ${msg.content.bitrate}`
  );
}

export class AudioQueueProvider implements Provider<any> {
  async value() {
    const CONN_URL = "amqp://localhost";
    let ch = null;
    const channelName = 'AudioQueue';
    
    amqp.connect(CONN_URL, function (err, conn) {
      conn.createChannel(function (err, channel) {
        ch = channel;
      });
    });

    ch.assertQueue(channelName, {
      durable: false
    });

    ch.consume(channelName, audioProcessor, {
      noAck: true
    });
    
    return ch;
  }
}

export class AudioController {
  constructor(
    @service(AudioQueueProvider) public channel: any;
  ) {}

  @get('/process-audio')
  async addToQueue(): Promise<string> {
    await this.channel.sendToQueue(
      'AudioQueue',
      {
        filename: 'process_me.wav',
        bitrate: 320,
      }
    );
    return 'Audio file added to the AudioQueue for processing';
  }
}

  

Комментарии:

1. @service(AudioQueueProvider) public queue: Queue у меня не сработало. Мне пришлось привязать AudioQueueProvider в конструкторе приложения и внедрить в контроллер @inject(QUEUE_SERVICE_BINDING_KEY) public queue: Queue