как установить время события hbase при объединении временной таблицы в flink sql

#join #apache-flink #temporal

Вопрос:

слева — таблица фактов из кафки

 CREATE TABLE dig_user_join_kafka ( id string, username string, city_id string, create_time TIMESTAMP(3), WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND  

)

справа находится таблица измерений из hbase

 CREATE TABLE dim_city_hbase (  id string,  info ROW< name string >,  PRIMARY KEY (id) NOT ENFORCED )  

Я хочу объединить временную таблицу со временем события

 insert into dim_city_join_hbase  select id as id,  ROW(username, city, create_time) as info  from (  select kj.id as id,  kj.username as username,  hj.info.name as city,  kj.create_time as create_time  from dig_user_join2_kafka kj left join dim_city_hbase FOR SYSTEM_TIME AS OF kj.create_time hj  on kj.city_id = hj.id )  

теперь ошибка в том, что

 The main method caused an error: Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found  

это означает ,что в таблице hbase нет времени строки, как установить время события hbase?

во многих примерах показано объединение временной таблицы hbase с proctime, но никто не использует время событий,

Ответ №1:

используйте системную функцию «PROCTIME()» или «CURRENT_ROW_TIMESTAMP ()», чтобы добавить поле в качестве времени строки, а затем установить водяной знак :

 " update_time AS CURRENT_ROW_TIMESTAMP(), "   " WATERMARK FOR update_time AS update_time, "