Использование createDrainingControl в потребительском API?

#akka #kafka-consumer-api #akka-stream #alpakka

#akka #кафка-потребитель-api #akka-поток #alpakka

Вопрос:

Я просматривал документацию по потребительскому API для Kafka в Alpakka. Я наткнулся на этот фрагмент кода. Насколько я понимаю, смещение фиксируется с помощью msg.committableOffset(). Тогда зачем нам нужны .toMat() и mapMaterializedValue() . Разве я не могу просто прикрепить его к Sink.Игнорировать()?

 Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
      .mapMaterializedValue(Consumer::createDrainingControl)
      .run(materializer);
  

Ответ №1:

Вы не можете подключиться к Sink.ignore, потому что вы уже подключили Commiter.Sink. Но вы можете отказаться от материализованных значений.

В качестве примера используется toMat с Keep.both для сохранения обоих материализованных значений, элемента управления из источника и будущего [Done] из приемника. С обоими значениями он создает DrainingControl в mapMaterializedValue, который позволяет вам останавливать поток или сливать поток перед остановкой или получать уведомления, когда поток останавливается.

Если вас не волнует этот элемент управления (хотя вы должны), вы можете использовать

 Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .to(Committer.sink(committerSettings.withMaxBatch(1)))
      .run(materializer);
  

Комментарии:

1. Спасибо за объяснение, как вы упомянули, DrainingControl позволяет мне останавливать поток или сливать его. В случае, если я этого не сделаю, будут ли новые сообщения Kafka в теме не выбраны потребителем? Я думаю, я пытаюсь выяснить использование drainControl