#apache-kafka #apache-flink #flink-sql
#апачи-кафка #apache-flink #flink-sql
Вопрос:
Flink SQL
, как получить первую запись и последнюю запись по времени события в Kafka
потоке данных и сохранить ее в БД (например, MySQL
)?
Кроме того, если поступает новая запись в Kafka
потоке данных, мы должны обновить эту запись MySQL
.
- Предположение, записи в
Kafka
нем следующие:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
{'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
{'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}
- Таким образом
Flink SQL
, результаты , которых я ожидал , следующие:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
- Мы сохраняем эти записи в
MySQL
, предполагая , что результат будет следующим,
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 16:10:00 | 18 |
- Если грядет новая запись
Kafka
,
{'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
- Я надеюсь, что мы сможем обновить запись
are
inMySQL
, результат обновления выглядит следующим образом:
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 17:18:00 | 30 |
У меня возникли некоторые проблемы на 2-м и 5-м этапах, кто-нибудь может дать какую-нибудь идею?
Комментарии:
1. Не могли бы вы показать, что вы уже сделали?
2. Не могли бы вы просмотреть каждую запись и вставить все поля, когда запись не существует, а затем обновить последние просмотренные поля, когда она есть? Вам все равно придется проверять наличие в таблице… В принципе, в непрерывной теме Kafka нет «последнего события», поэтому думать об этом таким образом, вероятно, ошибочно
3. Я не согласен, последнее в непрерывном потоке четко определено. Это просто означает «самый последний, основанный на некотором понятии времени».
4. @ArvidHeise, извините, я понятия не имел, как справиться с этой проблемой, поэтому я дал сценарий и надеялся получить какой-нибудь совет.
5. @Onecricket, да, я тоже рассматривал эту схему, но это вызовет большую нагрузку на БД. Кроме того, операция ОБНОВЛЕНИЯ сложна.
Ответ №1:
Дедупликация с упорядочением по времени строки была бы самым простым способом, но это поддерживается в 1.12. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication
CREATE TABLE src (
word STRING,
eventtime TIMESTAMP(3),
appear_page INT,
WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
...
);
-- get last row by word key
SELECT word, eventtime, appear_page
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
FROM src
) WHERE rownum = 1;
Этот запрос также должен работать в 1.11, но оптимизирован не для дедупликации, а для оператора TopN, который менее эффективен.
Комментарии:
1. Спасибо, он может получить первую запись и последнюю запись каждого слова отдельно, но, похоже, не может получить их одновременно в Flink SQL. В то время как Postgre SQL может справиться с этим, это может быть ограничением Flink SQL.
2. @Sky, извините, что вы имеете в виду «не можете получить их одновременно в Flink SQL»? Вы имеете в виду, что приведенный выше запрос не может работать в Flink SQL?
3. это работает в Flink SQL. Я имею в виду, что мы можем получить только первую запись или последнюю запись каждого слова в каждый момент времени с помощью вышеуказанного метода. Но я хочу получить первую запись и последнюю запись каждого слова в одном SQL. например: выберите word, eventtime, appear_page from( выберите *, row_number() over (раздел по порядку слов по eventtime desc) как rownum_last, row_number() over (раздел по порядку слов поeventtime asc) как rownum_first) где rownum_last = 1 или row_number_first = 1; Flink SQL не поддерживает способ с функцией two WINDOW, но GP в порядке.
4. О, я вижу, мы можем с помощью предложения SQL JOIN справиться с ситуацией.
5. На самом деле, я решил эту проблему с помощью Java DataStream API позже. Еще раз спасибо, вы даете мне новое решение.