Flink: как выполнить эмиссию после слияния?

#apache-flink

#apache-flink

Вопрос:

Я определяю Transaction класс:

 case class Transaction(accountId: Long, amount: Long, timestamp: Long)
 

TransactionSource Просто испускается Transaction с некоторым интервалом времени. Теперь я хочу вычислить последние 2 метки времени транзакции для каждого идентификатора учетной записи, см. Код ниже:

 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource

object LastNJob {

  final val QUERY =
    """
      |WITH last_n AS (
      |    SELECT accountId, `timestamp`
      |    FROM (
      |        SELECT *,
      |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
      |        FROM transactions
      |    )
      |    WHERE row_num <= 2
      |)
      |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
      |FROM last_n
      |GROUP BY accountId
      |""".stripMargin

  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)

    val txnStream: DataStream[Transaction] = streamEnv
      .addSource(new TransactionSource)
      .name("transactions")

    tableEnv.createTemporaryView("transactions", txnStream)

    tableEnv.executeSql(QUERY).print()
  }
}
 

Когда я запускаю программу, я получаю:

  ---- ---------------------- -------------------------------- 
| op |            accountId |                last2_timestamp |
 ---- ---------------------- -------------------------------- 
|  I |                    1 |                  1546272000000 |
|  I |                    2 |                  1546272360000 |
|  I |                    3 |                  1546272720000 |
|  I |                    4 |                  1546273080000 |
|  I |                    5 |                  1546273440000 |
| -U |                    1 |                  1546272000000 |
|  U |                    1 |    1546272000000,1546273800000 |
| -U |                    2 |                  1546272360000 |
|  U |                    2 |    1546272360000,1546274160000 |
| -U |                    3 |                  1546272720000 |
|  U |                    3 |    1546272720000,1546274520000 |
| -U |                    4 |                  1546273080000 |
|  U |                    4 |    1546273080000,1546274880000 |
| -U |                    5 |                  1546273440000 |
|  U |                    5 |    1546273440000,1546275240000 |
| -U |                    1 |    1546272000000,1546273800000 |
|  U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
|  U |                    1 |    1546273800000,1546275600000 |
(to continue)
 

Давайте сосредоточимся на последней транзакции (сверху) AccountId=1 . Когда происходит новая транзакция со счета 1, которая происходит с отметкой времени = 1546275600000, всего выполняется 4 операции.

  ---- ---------------------- -------------------------------- 
| op |            accountId |                last2_timestamp |
 ---- ---------------------- -------------------------------- 
| -U |                    1 |    1546272000000,1546273800000 |
|  U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
|  U |                    1 |    1546273800000,1546275600000 |
 

Пока я только хочу передать приведенный ниже «новый статус» моему нижестоящему потоку (скажем, другой теме Кафки) посредством какого-то слияния:

  ---------------------- -------------------------------- 
|            accountId |                last2_timestamp |
 ---------------------- -------------------------------- 
|                    1 |    1546273800000,1546275600000 |
 

Чтобы мой нисходящий поток мог буквально использовать «последние 2 метки времени транзакции каждой учетной записи»:

  ---------------------- -------------------------------- 
|            accountId |                last2_timestamp |
 ---------------------- -------------------------------- 
|                    1 |                  1546272000000 |
|                    1 |    1546272000000,1546273800000 |
|                    1 |    1546273800000,1546275600000 |
(to continue)
 

Как правильно это сделать?

Комментарии:

1. Ответ на этот вопрос был получен в списке рассылки: apache-flink-user-mailing-list-archive.2336050.n4.nabble.com /…

Ответ №1:

Вот краткое изложение ответа Тимо Вальтера из списка рассылки пользователей Apache.


Решение 1: используйте API потока toRetractStream данных и filter отфильтровывайте событие удаления, чтобы только события увеличения передавались в нисходящий поток

Решение 2. Внедрите UDF для объединения операций Top-2 и LIST_AGG в одну.