#apache-flink
Вопрос:
Ниже приводится определение WatermarkGenerator
,
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
За onEvent
метод, почему существует избыточный аргумент long eventTimestamp
в сигнатуре метода, я думаю, что время события должны быть извлечены из T event
(конкретный тип события должны носить с Время события), и она должна быть равна eventTimestamp, поэтому я хотел спросить, почему этот избыточной agument long eventTimestamp
нужен, так как я могу сделать это из события,что безопасность здесь?
Ответ №1:
Метка времени, переданная в метод, является текущей меткой времени в конверте, в котором находится ваше событие. onEvent
StreamRecord
Это независимо от того, какая метка времени ранее была назначена этому событию-например, в случае Кафки это может быть значение метки времени в заголовке записи Кафки.
Хотя это, как правило, избыточная информация, бывают ситуации, когда полезно иметь доступ к метке времени, ранее назначенной выше по потоку.
Ответ №2:
Я думаю, что логика состоит в том, чтобы WatermarkGenerator
не делать никаких предположений относительно того, откуда берется время события.
Типичным случаем действительно является то, когда мы, как разработчик, предоставляем TimestampAssigner
несколько шагов раньше, чтобы извлечь время события из каждого события. Даже в этом случае, вероятно , желательно не повторять такую логику в WatermarkGenerator
, чтобы избежать сцепления, особенно если процесс более сложный, чем просто чтение одного поля. Так что это первая мотивация для предоставления его здесь.
Другой типичный случай-это когда время события указывается самим источником данных, независимо от любого поля в каждом событии. Например, можно было бы использовать соединитель Кафки таким образом, чтобы он получал время события из метаданных метки времени кафки независимо от полезной нагрузки события.