Оценка водяного знака для времени события в луче

#timestamp #apache-beam #watermark #stream-processing #apache-beam-kafkaio

#временная метка #apache-beam #водяной знак #потоковая обработка #apache-beam-kafkaio

Вопрос:

Я пытаюсь использовать Beam для агрегирования по набору данных, используя время события из данных и Kafka в качестве источника данных. Это работает, если все мои разделы kafka заполнены данными. Однако, как только раздел еще не был записан, водяной знак не может быть оценен и расширен.Моя политика временных меток следующая:

 public class CustomTimeStampPolicy
    extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
  protected Instant currentWatermark;

  public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
  }


  @Override
  public Instant getTimestampForRecord(final PartitionContext ctx,
      final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return this.currentWatermark;
  }

  @Override
  public Instant getWatermark(final PartitionContext ctx) {
    System.out.println("Current Watermark: "   this.currentWatermark);
    return this.currentWatermark;
  }
}
 

С 3 разделами Kafka, только один из которых заполнен данными, мои журналы показывают мне эти водяные знаки:

 Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z
 

При запуске по умолчанию мои окна не будут срабатывать. Я предполагаю, что выходной водяной знак является минимальным по сравнению с водяными знаками разделов. И, следовательно, не будет продвигаться, пока некоторые из моих разделов пусты. Как я могу обрабатывать пустые разделы с обработкой времени события?

Ответ №1:

Если в раздел Kafka не записаны данные, Beam не может знать, что после записи элемента у него не будет произвольной временной метки в прошлом, отсюда и очень старый водяной знак.

Вы могли бы попробовать обновить свой конструктор политики временных меток, чтобы previousWatermark.orElse(wallTime - someMaximumSkew)

где someMaximumSkew самая большая задержка, которую вы могли ожидать увидеть для данных, записанных в kafka. Вы также можете рассмотреть возможность использования минимального значения предыдущего значения (если таковое имеется) и wallTime - someMaximumSkew перехода, когда некоторое время данные не записывались.