#apache-flink #flink-sql #flink-cep
Вопрос:
Я тестирую flink cep sql, и мой водяной знак определяется как время строки, моя таблица-таблица кафки. Поскольку водяной знак зависит от минимального из всех разделов кафки, поэтому каждое новое сообщение должно ждать выравнивания разделов кафки, а затем результатов запуска cep.
Моя таблица кафки(раздел состоит из 3 разделов) определяется как
create table test_table(
agent_id String, room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time as to_timestamp_ltz(create_time, 3),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)
Вот мой cep sql
select * from test_table match_recognize (
partition by agent_id,room_id,call_type
order by row_time
measures
last(BF.create_time) as create_time,
last(AF.connect_time) as connect_time
one row per match after match SKIP PAST LAST ROW
pattern (BF AF) WITHIN INTERVAL '1' HOUR
define
BF as BF.connect_time = 0,
AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type
) as T ;
Результат триггера sql cep правильный, но всегда запаздывает, потому что для каждого раздела требуется водяной знак выравнивания. Как я могу немедленно получить новейший результат или автоматически сгенерировать водяной знак в таблице sql flink?
Ответ №1:
Ваш шаблон просит найти строку, connect_time > 0
которая находится сразу после строки, где connect_time = 0
(где обе строки имеют одинаковый идентификатор room_id и тип вызова). Чтобы это сопоставление с образцом было выполнено совершенно правильно, необходимо дождаться водяных знаков. В противном случае преждевременное совпадение может быть признано недействительным из-за появления события, выходящего из строя, например, события с connect_time < 0
правом перед автофокусировкой. (Возможно, вы знаете, что это невозможно, но механизм cep/sql не может этого знать.)
Если вы хотите ослабить семантику сопоставления шаблонов, почему бы не заменить этот запрос MATCH_RECOGNIZE интервальным соединением (самостоятельным соединением с временным ограничением). Видишь https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins для подробностей.
Кстати, эта часть определения AF
... and BF.room_id = AF.room_id and BF.call_type = AF.call_type
не оказывает никакого эффекта, так как поток уже разделен обоими room_id
и call_type
.
Комментарии:
1. Это Дэвид. Часть определения AF бессмысленна. В обычном Sql Flink я могу получить результат раннего запуска, установив table.exec.emit.early-fire.enabled и table.exec.emit.early-fire. задержка. Но, похоже, это не работает в cep sql.
2. Эти (недокументированные) параметры конфигурации являются экспериментальными и (до сих пор) применяются только к Windows.