как автоматически сгенерировать водяной знак в sql-таблице flink?

#apache-flink #flink-sql #flink-cep

Вопрос:

Я тестирую flink cep sql, и мой водяной знак определяется как время строки, моя таблица-таблица кафки. Поскольку водяной знак зависит от минимального из всех разделов кафки, поэтому каждое новое сообщение должно ждать выравнивания разделов кафки, а затем результатов запуска cep.

Моя таблица кафки(раздел состоит из 3 разделов) определяется как

 create table test_table(
    agent_id String, room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time as to_timestamp_ltz(create_time, 3), 
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)
 

Вот мой cep sql

 select * from test_table  match_recognize (
   partition by agent_id,room_id,call_type 
   order by row_time
   measures  
       last(BF.create_time) as create_time, 
       last(AF.connect_time) as connect_time 
   one row per match after match SKIP PAST LAST ROW 
   pattern (BF  AF) WITHIN INTERVAL '1' HOUR 
   define 
       BF as BF.connect_time = 0,
       AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type 
) as T ;
 

Результат триггера sql cep правильный, но всегда запаздывает, потому что для каждого раздела требуется водяной знак выравнивания. Как я могу немедленно получить новейший результат или автоматически сгенерировать водяной знак в таблице sql flink?

Ответ №1:

Ваш шаблон просит найти строку, connect_time > 0 которая находится сразу после строки, где connect_time = 0 (где обе строки имеют одинаковый идентификатор room_id и тип вызова). Чтобы это сопоставление с образцом было выполнено совершенно правильно, необходимо дождаться водяных знаков. В противном случае преждевременное совпадение может быть признано недействительным из-за появления события, выходящего из строя, например, события с connect_time < 0 правом перед автофокусировкой. (Возможно, вы знаете, что это невозможно, но механизм cep/sql не может этого знать.)

Если вы хотите ослабить семантику сопоставления шаблонов, почему бы не заменить этот запрос MATCH_RECOGNIZE интервальным соединением (самостоятельным соединением с временным ограничением). Видишь https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins для подробностей.

Кстати, эта часть определения AF

 ... and BF.room_id = AF.room_id and BF.call_type = AF.call_type
 

не оказывает никакого эффекта, так как поток уже разделен обоими room_id и call_type .

Комментарии:

1. Это Дэвид. Часть определения AF бессмысленна. В обычном Sql Flink я могу получить результат раннего запуска, установив table.exec.emit.early-fire.enabled и table.exec.emit.early-fire. задержка. Но, похоже, это не работает в cep sql.

2. Эти (недокументированные) параметры конфигурации являются экспериментальными и (до сих пор) применяются только к Windows.