потоки akka потребитель кафки останавливается через несколько секунд

#scala #akka #kafka-consumer-api #akka-stream #alpakka

Вопрос:

Я читаю от потребителя кафки, затем сохраняю сообщение в бд и отправляю производителю на другую тему. Мое приложение akka streams перестает работать через несколько секунд после запуска.

Вот как выглядит мой поток.

 Consumer.committableSource(consumerSettings, Subscriptions.topics(config.getString("topic")))
  .mapAsync(8) {
    msg => dbPersistActor.ask(msg.record.value()).map(_=> msg)
  }.async
  .map {
    msg =>
      ProducerMessage.Message(new ProducerRecord("test-output", msg.record.key(), msg.record.value())
        , passThrough = msg.committableOffset)
  }.via(Producer.flexiFlow(producerSettings))
  .map(_.passThrough)
  .via(Committer.flow(committerSettings))
  .runWith(Sink.ignore)
 

Комментарии:

1. Можете ли вы показать нам какие-либо сообщения в журнале? Вы проверили код выхода вашего потребительского приложения?

2. это с помощью Альпакки ? Кроме того, как выглядит остальная часть кода ?

3. да, я использую Альпакку. В этом классе не так уж много. Даже у dbPersistActor прямо сейчас есть инструкция для печати.

4. @shekhar Я не увидел в журнале ничего полезного. Это изящно останавливает потребителя, выходя из группы потребителей, а затем останавливаясь

5. Можете ли вы добавить некоторые инструкции журнала/ печати, например, то, что находится в «msg.record.value ()»? По умолчанию потребитель Akka завершает / завершает работу беззвучно, не выплевывая ошибку, но это поведение может быть изменено. Ознакомьтесь с документацией Akka, и вы найдете, как справиться с состоянием ошибки. Если вы добавите этот код, то увидите, что происходит.