#akka-stream #alpakka
#akka-stream #alpakka
Вопрос:
Я пытаюсь реализовать очень простой сервис, подключенный к брокеру AMQP с помощью Alpakka. Я просто хочу, чтобы он получал сообщения из своей очереди в виде потока в момент их отправки по заданному обмену / теме.
В моих тестах, казалось, все работало нормально, но когда я попытался запустить свой сервис, я понял, что мой поток потреблял мои сообщения только один раз, а затем завершался.
В основном я использую код из документации Alpakka :
def consume()={
val amqpSource = AmqpSource.committableSource(
TemporaryQueueSourceSettings(connectionProvider, exchangeName)
.withDeclaration(exchangeDeclaration)
.withRoutingKey(topic),
bufferSize = prefetchCount
)
val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))
amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}
Я пытался планировать consume()
выполнение каждую секунду, но у меня возникли OutOfMemoryException
проблемы.
Есть ли какой-нибудь правильный способ заставить этот код выполняться как бесконечный цикл?
Ответ №1:
Если вы хотите Source
перезапустить его при сбое или отмене, оберните его с помощью RestartSource.withBackoff
.
Комментарии:
1. Я уже пытался использовать
RestartSource.withBackoff
, но это не перезапускало исходный код по завершении, только при сбое. Я ищу источник, который никогда не заканчивается.2.
onFailuresWithBackoff
делает это.withBackoff
перезапускается как при сбое, так и при завершении.3. Вы правы, RestartSource сделал это, спасибо! Моя проблема с
RestartSource
возникла из-за моего приемника. Кажется, теперь все работает нормально.