Flink SQL, как получить первую запись и последнюю запись по времени события в потоке данных kafka и сохранить ее в БД (например, GP, MySQL)?

#apache-kafka #apache-flink #flink-sql

#апачи-кафка #apache-flink #flink-sql

Вопрос:

Flink SQL , как получить первую запись и последнюю запись по времени события в Kafka потоке данных и сохранить ее в БД (например, MySQL )?

Кроме того, если поступает новая запись в Kafka потоке данных, мы должны обновить эту запись MySQL .

  1. Предположение, записи в Kafka нем следующие:
     {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
    {'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
    {'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}      
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}

 
  1. Таким образом Flink SQL , результаты , которых я ожидал , следующие:
     {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
 
  1. Мы сохраняем эти записи в MySQL , предполагая , что результат будет следующим,
     |    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 16:10:00     |             18             |
 
  1. Если грядет новая запись Kafka ,
     {'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
 
  1. Я надеюсь, что мы сможем обновить запись are in MySQL , результат обновления выглядит следующим образом:
     |    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 17:18:00     |             30             |
 

У меня возникли некоторые проблемы на 2-м и 5-м этапах, кто-нибудь может дать какую-нибудь идею?

Комментарии:

1. Не могли бы вы показать, что вы уже сделали?

2. Не могли бы вы просмотреть каждую запись и вставить все поля, когда запись не существует, а затем обновить последние просмотренные поля, когда она есть? Вам все равно придется проверять наличие в таблице… В принципе, в непрерывной теме Kafka нет «последнего события», поэтому думать об этом таким образом, вероятно, ошибочно

3. Я не согласен, последнее в непрерывном потоке четко определено. Это просто означает «самый последний, основанный на некотором понятии времени».

4. @ArvidHeise, извините, я понятия не имел, как справиться с этой проблемой, поэтому я дал сценарий и надеялся получить какой-нибудь совет.

5. @Onecricket, да, я тоже рассматривал эту схему, но это вызовет большую нагрузку на БД. Кроме того, операция ОБНОВЛЕНИЯ сложна.

Ответ №1:

Дедупликация с упорядочением по времени строки была бы самым простым способом, но это поддерживается в 1.12. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication

 CREATE TABLE src (
  word STRING,
  eventtime TIMESTAMP(3),
  appear_page INT,
  WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
  'connector' = 'kafka',
  ...
);

-- get last row by word key
SELECT word, eventtime, appear_page
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
  FROM src
) WHERE rownum = 1;

 

Этот запрос также должен работать в 1.11, но оптимизирован не для дедупликации, а для оператора TopN, который менее эффективен.

Комментарии:

1. Спасибо, он может получить первую запись и последнюю запись каждого слова отдельно, но, похоже, не может получить их одновременно в Flink SQL. В то время как Postgre SQL может справиться с этим, это может быть ограничением Flink SQL.

2. @Sky, извините, что вы имеете в виду «не можете получить их одновременно в Flink SQL»? Вы имеете в виду, что приведенный выше запрос не может работать в Flink SQL?

3. это работает в Flink SQL. Я имею в виду, что мы можем получить только первую запись или последнюю запись каждого слова в каждый момент времени с помощью вышеуказанного метода. Но я хочу получить первую запись и последнюю запись каждого слова в одном SQL. например: выберите word, eventtime, appear_page from( выберите *, row_number() over (раздел по порядку слов по eventtime desc) как rownum_last, row_number() over (раздел по порядку слов поeventtime asc) как rownum_first) где rownum_last = 1 или row_number_first = 1; Flink SQL не поддерживает способ с функцией two WINDOW, но GP в порядке.

4. О, я вижу, мы можем с помощью предложения SQL JOIN справиться с ситуацией.

5. На самом деле, я решил эту проблему с помощью Java DataStream API позже. Еще раз спасибо, вы даете мне новое решение.