как получить последний результат при выполнении объединения таблиц (используя toRetractStream в flink sql

#apache-flink #flink-streaming #flink-sql #flink-table-api

Вопрос:

Вот мой код, использующий flink sql API для объединения двух таблиц

 tEnv.createTemporaryView("A", streamA,"speed_sum,cnt,window_start_time,window_end_time");
tEnv.createTemporaryView("B",streamB,"speed_sum,cnt,window_start_time,window_end_time");

String execSQL1 = "select A.speed_sum COALESCE(B.speed_sum,0.0), "  
        "A.cnt COALESCE(B.cnt,0), "  
        "A.window_start_time, A.window_end_time "  
        "from A "  
        "left join B on A.window_start_time = B.window_start_time ";
Table table = tEnv.sqlQuery(execSQL1);
DataStream<Tuple2<Boolean, Row>> streamResult = tEnv.toRetractStream(table, Row.class).;
streamResult.print("streamResult");
 

Мой вывод выглядит следующим образом:

  streamA-----------(5078.000000,199,1635333650000,1635333660000)
 streamB-----------(1721.388891,111,1635333650000,1635333660000)
 streamResult:3> (true,5078.0,199,1635333650000,1635333660000) // drop
 streamResult:3> (false,5078.0,199,1635333650000,1635333660000) // drop
 streamResult:3> (true,6799.388891220093,310,1635333650000,1635333660000)  // want to save
 

Как вы можете видеть, toRetractStream API сгенерирует три фрагмента записи. Мне интересно, как получить последний фрагмент записи, который правильно суммирует A.speed_sum и B.speed_sum ( A.cnt и B.cnt ).

Ответ №1:

Некоторые потоковые SQL-запросы, такие как ваше ОБЪЕДИНЕНИЕ, создают поток обновлений. Учитывая непрерывный, неограниченный характер потоковой передачи, у Flink нет возможности узнать, когда достигнут «окончательный» результат.

Если вы выполняете этот запрос на ограниченных входных данных, вы можете выполнить его в пакетном режиме, и тогда будет напечатан только конечный результат.

В некоторых случаях использования потоковой передачи вы можете использовать атрибуты времени, а не временные метки, и тогда планировщик Flink SQL сможет определить, когда результаты для определенных запросов будут завершены. Например, именно так Windows в Flink SQL может создавать поток добавления, а не поток обновления. Ваш запрос — это почти интервальное соединение. Если бы это было интервальное объединение, то результирующий поток был бы потоком добавления, и вам не пришлось бы иметь дело с этими ретракциями.