#apache-flink #flink-streaming #flink-sql #flink-table-api
Вопрос:
Вот мой код, использующий flink sql API для объединения двух таблиц
tEnv.createTemporaryView("A", streamA,"speed_sum,cnt,window_start_time,window_end_time");
tEnv.createTemporaryView("B",streamB,"speed_sum,cnt,window_start_time,window_end_time");
String execSQL1 = "select A.speed_sum COALESCE(B.speed_sum,0.0), "
"A.cnt COALESCE(B.cnt,0), "
"A.window_start_time, A.window_end_time "
"from A "
"left join B on A.window_start_time = B.window_start_time ";
Table table = tEnv.sqlQuery(execSQL1);
DataStream<Tuple2<Boolean, Row>> streamResult = tEnv.toRetractStream(table, Row.class).;
streamResult.print("streamResult");
Мой вывод выглядит следующим образом:
streamA-----------(5078.000000,199,1635333650000,1635333660000)
streamB-----------(1721.388891,111,1635333650000,1635333660000)
streamResult:3> (true,5078.0,199,1635333650000,1635333660000) // drop
streamResult:3> (false,5078.0,199,1635333650000,1635333660000) // drop
streamResult:3> (true,6799.388891220093,310,1635333650000,1635333660000) // want to save
Как вы можете видеть, toRetractStream
API сгенерирует три фрагмента записи. Мне интересно, как получить последний фрагмент записи, который правильно суммирует A.speed_sum
и B.speed_sum
( A.cnt
и B.cnt
).
Ответ №1:
Некоторые потоковые SQL-запросы, такие как ваше ОБЪЕДИНЕНИЕ, создают поток обновлений. Учитывая непрерывный, неограниченный характер потоковой передачи, у Flink нет возможности узнать, когда достигнут «окончательный» результат.
Если вы выполняете этот запрос на ограниченных входных данных, вы можете выполнить его в пакетном режиме, и тогда будет напечатан только конечный результат.
В некоторых случаях использования потоковой передачи вы можете использовать атрибуты времени, а не временные метки, и тогда планировщик Flink SQL сможет определить, когда результаты для определенных запросов будут завершены. Например, именно так Windows в Flink SQL может создавать поток добавления, а не поток обновления. Ваш запрос — это почти интервальное соединение. Если бы это было интервальное объединение, то результирующий поток был бы потоком добавления, и вам не пришлось бы иметь дело с этими ретракциями.