Бесконечный потребитель AMQP с Alpakka

#akka-stream #alpakka

#akka-stream #alpakka

Вопрос:

Я пытаюсь реализовать очень простой сервис, подключенный к брокеру AMQP с помощью Alpakka. Я просто хочу, чтобы он получал сообщения из своей очереди в виде потока в момент их отправки по заданному обмену / теме.

В моих тестах, казалось, все работало нормально, но когда я попытался запустить свой сервис, я понял, что мой поток потреблял мои сообщения только один раз, а затем завершался.

В основном я использую код из документации Alpakka :

 def consume()={
    val amqpSource = AmqpSource.committableSource(
      TemporaryQueueSourceSettings(connectionProvider, exchangeName)
        .withDeclaration(exchangeDeclaration)
        .withRoutingKey(topic),
      bufferSize = prefetchCount
    )

    val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))

    amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}
  

Я пытался планировать consume() выполнение каждую секунду, но у меня возникли OutOfMemoryException проблемы.

Есть ли какой-нибудь правильный способ заставить этот код выполняться как бесконечный цикл?

Ответ №1:

Если вы хотите Source перезапустить его при сбое или отмене, оберните его с помощью RestartSource.withBackoff .

Комментарии:

1. Я уже пытался использовать RestartSource.withBackoff , но это не перезапускало исходный код по завершении, только при сбое. Я ищу источник, который никогда не заканчивается.

2. onFailuresWithBackoff делает это. withBackoff перезапускается как при сбое, так и при завершении.

3. Вы правы, RestartSource сделал это, спасибо! Моя проблема с RestartSource возникла из-за моего приемника. Кажется, теперь все работает нормально.