#join #apache-flink #temporal
Вопрос:
слева — таблица фактов из кафки
CREATE TABLE dig_user_join_kafka ( id string, username string, city_id string, create_time TIMESTAMP(3), WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND
)
справа находится таблица измерений из hbase
CREATE TABLE dim_city_hbase ( id string, info ROW< name string >, PRIMARY KEY (id) NOT ENFORCED )
Я хочу объединить временную таблицу со временем события
insert into dim_city_join_hbase select id as id, ROW(username, city, create_time) as info from ( select kj.id as id, kj.username as username, hj.info.name as city, kj.create_time as create_time from dig_user_join2_kafka kj left join dim_city_hbase FOR SYSTEM_TIME AS OF kj.create_time hj on kj.city_id = hj.id )
теперь ошибка в том, что
The main method caused an error: Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found
это означает ,что в таблице hbase нет времени строки, как установить время события hbase?
во многих примерах показано объединение временной таблицы hbase с proctime, но никто не использует время событий,
Ответ №1:
используйте системную функцию «PROCTIME()» или «CURRENT_ROW_TIMESTAMP ()», чтобы добавить поле в качестве времени строки, а затем установить водяной знак :
" update_time AS CURRENT_ROW_TIMESTAMP(), " " WATERMARK FOR update_time AS update_time, "