#timestamp #apache-beam #watermark #stream-processing #apache-beam-kafkaio
#временная метка #apache-beam #водяной знак #потоковая обработка #apache-beam-kafkaio
Вопрос:
Я пытаюсь использовать Beam для агрегирования по набору данных, используя время события из данных и Kafka в качестве источника данных. Это работает, если все мои разделы kafka заполнены данными. Однако, как только раздел еще не был записан, водяной знак не может быть оценен и расширен.Моя политика временных меток следующая:
public class CustomTimeStampPolicy
extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
protected Instant currentWatermark;
public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return this.currentWatermark;
}
@Override
public Instant getWatermark(final PartitionContext ctx) {
System.out.println("Current Watermark: " this.currentWatermark);
return this.currentWatermark;
}
}
С 3 разделами Kafka, только один из которых заполнен данными, мои журналы показывают мне эти водяные знаки:
Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z
При запуске по умолчанию мои окна не будут срабатывать. Я предполагаю, что выходной водяной знак является минимальным по сравнению с водяными знаками разделов. И, следовательно, не будет продвигаться, пока некоторые из моих разделов пусты. Как я могу обрабатывать пустые разделы с обработкой времени события?
Ответ №1:
Если в раздел Kafka не записаны данные, Beam не может знать, что после записи элемента у него не будет произвольной временной метки в прошлом, отсюда и очень старый водяной знак.
Вы могли бы попробовать обновить свой конструктор политики временных меток, чтобы previousWatermark.orElse(wallTime - someMaximumSkew)
где someMaximumSkew
самая большая задержка, которую вы могли ожидать увидеть для данных, записанных в kafka. Вы также можете рассмотреть возможность использования минимального значения предыдущего значения (если таковое имеется) и wallTime - someMaximumSkew
перехода, когда некоторое время данные не записывались.