#apache-flink
#apache-flink
Вопрос:
Я определяю Transaction
класс:
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
TransactionSource
Просто испускается Transaction
с некоторым интервалом времени. Теперь я хочу вычислить последние 2 метки времени транзакции для каждого идентификатора учетной записи, см. Код ниже:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource
object LastNJob {
final val QUERY =
"""
|WITH last_n AS (
| SELECT accountId, `timestamp`
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
| FROM transactions
| )
| WHERE row_num <= 2
|)
|SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
|FROM last_n
|GROUP BY accountId
|""".stripMargin
def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)
val txnStream: DataStream[Transaction] = streamEnv
.addSource(new TransactionSource)
.name("transactions")
tableEnv.createTemporaryView("transactions", txnStream)
tableEnv.executeSql(QUERY).print()
}
}
Когда я запускаю программу, я получаю:
---- ---------------------- --------------------------------
| op | accountId | last2_timestamp |
---- ---------------------- --------------------------------
| I | 1 | 1546272000000 |
| I | 2 | 1546272360000 |
| I | 3 | 1546272720000 |
| I | 4 | 1546273080000 |
| I | 5 | 1546273440000 |
| -U | 1 | 1546272000000 |
| U | 1 | 1546272000000,1546273800000 |
| -U | 2 | 1546272360000 |
| U | 2 | 1546272360000,1546274160000 |
| -U | 3 | 1546272720000 |
| U | 3 | 1546272720000,1546274520000 |
| -U | 4 | 1546273080000 |
| U | 4 | 1546273080000,1546274880000 |
| -U | 5 | 1546273440000 |
| U | 5 | 1546273440000,1546275240000 |
| -U | 1 | 1546272000000,1546273800000 |
| U | 1 | 1546273800000 |
| -U | 1 | 1546273800000 |
| U | 1 | 1546273800000,1546275600000 |
(to continue)
Давайте сосредоточимся на последней транзакции (сверху) AccountId=1 . Когда происходит новая транзакция со счета 1, которая происходит с отметкой времени = 1546275600000, всего выполняется 4 операции.
---- ---------------------- --------------------------------
| op | accountId | last2_timestamp |
---- ---------------------- --------------------------------
| -U | 1 | 1546272000000,1546273800000 |
| U | 1 | 1546273800000 |
| -U | 1 | 1546273800000 |
| U | 1 | 1546273800000,1546275600000 |
Пока я только хочу передать приведенный ниже «новый статус» моему нижестоящему потоку (скажем, другой теме Кафки) посредством какого-то слияния:
---------------------- --------------------------------
| accountId | last2_timestamp |
---------------------- --------------------------------
| 1 | 1546273800000,1546275600000 |
Чтобы мой нисходящий поток мог буквально использовать «последние 2 метки времени транзакции каждой учетной записи»:
---------------------- --------------------------------
| accountId | last2_timestamp |
---------------------- --------------------------------
| 1 | 1546272000000 |
| 1 | 1546272000000,1546273800000 |
| 1 | 1546273800000,1546275600000 |
(to continue)
Как правильно это сделать?
Комментарии:
1. Ответ на этот вопрос был получен в списке рассылки: apache-flink-user-mailing-list-archive.2336050.n4.nabble.com /…
Ответ №1:
Вот краткое изложение ответа Тимо Вальтера из списка рассылки пользователей Apache.
Решение 1: используйте API потока toRetractStream
данных и filter
отфильтровывайте событие удаления, чтобы только события увеличения передавались в нисходящий поток
Решение 2. Внедрите UDF для объединения операций Top-2 и LIST_AGG в одну.