#apache-kafka #ksqldb #event-driven-design
Вопрос:
Путаюсь в событиях ksqlDB и материализованных представлениях. Я хочу получать уведомления в специальной теме / потоке, когда возникает новая высокая цена акций, как определено в сводном / материализованном представлении таблицы, но я получаю информацию о высокой цене для каждого события, а не только при появлении нового события с высокой ценой.
Вот мой рабочий пример / настройка.
Создайте базовый поток и тему для акций.
ksql> create stream stocks (symbol VARCHAR KEY, company VARCHAR, price DECIMAL(9, 2))
> WITH (KAFKA_TOPIC='stocks', PARTITIONS=1, VALUE_FORMAT='json');
Message
----------------
Stream created
----------------
добавьте некоторые исходные данные о ценах акций Acme Corp
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.11);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.12);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.13);
Распечатайте основную тему, чтобы доказать наличие данных.
ksql> print 'stocks' from beginning limit 3;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:26:57.169 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.11}
rowtime: 2021/03/21 14:27:01.717 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.12}
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.13}
Topic printing ceased
Создайте агрегированное / материализованное представление, чтобы показать самые высокие цены на акции.
ksql> create table stock_highs as
> select symbol, max(price) as high
> from stocks
> group by symbol
> emit changes;
Message
--------------------------------------------
Created query with ID CTAS_STOCK_HIGHS_115
--------------------------------------------
Запросите его для визуального осмотра.
ksql> select * from stock_highs where symbol = 'ACME';
-------------- ------------------------
|SYMBOL |HIGH |
-------------- ------------------------
|ACME |111.13 |
Query terminated
Используйте consumer (он же print STOCK_HIGHS) в отдельном терминале (терминал 2), чтобы следить за изменениями высокой цены акций.
ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}
Back in original terminal (terminal 1) with ksql client insert more data to force the stock high to be updated.
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.20);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.15);
The above inserts should give a new high price of 111.20 and ignore the 111.15 price so, in my thinking and use case I’d want to get an event / message from the STOCK_HIGHS topic showing a new high stock price of 111.20 but not the 111.15 price. However, I get two new events in the consumer (terminal 2).
ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}
rowtime: 2021/03/21 14:37:51.234 Z, key: 1094929733, value: {"HIGH":111.20}
rowtime: 2021/03/21 14:39:03.301 Z, key: 1094929733, value: {"HIGH":111.20}
Проблема в том, что я действительно хочу, чтобы меня уведомляли или «проверяли» только тогда, когда возникают новые высокие цены, таким образом, я могу заставить потребителя уйти и сделать что-то значимое, когда установлена новая высокая цена.