Работает ли генерация водяных знаков по-разному в локальных / автономных режимах при использовании раздела kafka с простаивающими разделами?

#apache-kafka #apache-flink

#apache-kafka #apache-flink

Вопрос:

Я рассматриваю этот водяной знак:

 class MyWatermarker(val maxTimeLag: Long = 0)
    extends AssignerWithPeriodicWatermarks[MyEvent] {
  var maxTs: Long = 0

  override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = {
    val timestamp = e.timestamp
    maxTs = maxTs.max(timestamp)
    timestamp
  }

  override def getCurrentWatermark: WatermarkOld = {
    println(s"event watermark: ${maxTs - maxTimeLag}")
    new WatermarkOld(maxTs - maxTimeLag)
  }
  

Базовые события поступают из источника kafka, а затем передаются функции процесса. Реализация не имеет отношения к вопросу, я просто поделюсь соответствующим битом:

   override def processElement(
    event: MyEvent,
    ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
    out: Collector[StreamEvent]
  ): Unit = {
    println(
      s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
    )
  ...
  }
  

Когда я запускаю это приложение в реальном кластере kubernetes, используя исходную тему kafka с простаивающими разделами, водяной знак возвращается к 0, как и ожидалось:

 In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0
  

Я также могу видеть эти журналы, созданные в watermarker:

 event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0
  

Самое смешное, что когда я запускаю одно и то же приложение локально в IntelliJ, а также имею простаивающие разделы kafka для той же темы, я также получаю вышеуказанные журналы от средства водяных знаков, причем водяной знак колеблется между 0 и ts последнего полученного элемента, поскольку maxLag = 0 . Однако, совершенно неожиданно для меня, журналы из функции process показывают, что водяной знак все еще продвигается:

 In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618
  

Почему это происходит? К вашему сведению, я использую Flink 1.10 с параллелизмом среды, установленным на 2, и семантикой времени события в обоих случаях.

Комментарии:

1.Также в apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/….

Ответ №1:

Если бы вы использовали водяные знаки для каждого раздела, что вы можете сделать, вызвав assignTimestampsAndWatermarks непосредственно на потребителе Flink Kafka [1], то я считаю, что простаивающие разделы будут последовательно удерживать общий водяной знак.

При использовании водяных знаков для каждого раздела каждая исходная задача Kafka будет применять водяные знаки отдельно к каждому разделу, который она обрабатывает, а затем выделять в качестве водяного знака минимум водяных знаков для каждого раздела. Поэтому по крайней мере одна из исходных задач Kafka будет иметь водяной знак, равный 0, и при условии, что у вас есть keyBy после водяного знака и перед функцией процесса, это будет сдерживать водяной знак в функции процесса.

В противном случае, если вы применяете водяные знаки к выходным данным исходных задач Kafka, то наличие водяных знаков у задач водяных знаков 0 или нет, зависит от того, имеет ли их соответствующая исходная задача Kafka какие-либо неиспользуемые разделы. Если назначение разделов экземплярам не является детерминированным, это может объяснить, почему вы видите разные результаты в IntelliJ.

Обратите внимание, что обработка простаивающих источников была переработана в Flink 1.11 [2], и исправления ошибок, связанные с этим, все еще находятся на рассмотрении [3].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission.
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources.
[3] https://issues.apache.org/jira/browse/FLINK-18934

Комментарии:

1. Спасибо, Дэвид, но что касается моего вопроса, является ли поведение, которое я наблюдаю, чем-то, что можно отнести к потенциальным различиям между локальным (IntelliJ) и автономным режимами, или этот момент не имеет никакого отношения, и это скорее совпадение из-за присущего вам недетерминизма, о котором вы упоминаете?

2. Ну, вы неявно тестируете что-то, где поведение не гарантируется API, так что да, возможно, что различия между локальным и удаленным выполнением являются фактором, способствующим недетерминированности, проявляющейся по-разному в этих двух случаях.