#scala #akka #kafka-consumer-api #akka-stream #alpakka
Вопрос:
Я читаю от потребителя кафки, затем сохраняю сообщение в бд и отправляю производителю на другую тему. Мое приложение akka streams перестает работать через несколько секунд после запуска.
Вот как выглядит мой поток.
Consumer.committableSource(consumerSettings, Subscriptions.topics(config.getString("topic")))
.mapAsync(8) {
msg => dbPersistActor.ask(msg.record.value()).map(_=> msg)
}.async
.map {
msg =>
ProducerMessage.Message(new ProducerRecord("test-output", msg.record.key(), msg.record.value())
, passThrough = msg.committableOffset)
}.via(Producer.flexiFlow(producerSettings))
.map(_.passThrough)
.via(Committer.flow(committerSettings))
.runWith(Sink.ignore)
Комментарии:
1. Можете ли вы показать нам какие-либо сообщения в журнале? Вы проверили код выхода вашего потребительского приложения?
2. это с помощью Альпакки ? Кроме того, как выглядит остальная часть кода ?
3. да, я использую Альпакку. В этом классе не так уж много. Даже у dbPersistActor прямо сейчас есть инструкция для печати.
4. @shekhar Я не увидел в журнале ничего полезного. Это изящно останавливает потребителя, выходя из группы потребителей, а затем останавливаясь
5. Можете ли вы добавить некоторые инструкции журнала/ печати, например, то, что находится в «msg.record.value ()»? По умолчанию потребитель Akka завершает / завершает работу беззвучно, не выплевывая ошибку, но это поведение может быть изменено. Ознакомьтесь с документацией Akka, и вы найдете, как справиться с состоянием ошибки. Если вы добавите этот код, то увидите, что происходит.