Есть уникальное событие Kafka ksqlDB, когда в материализованном представлении появляется новое МАКСИМАЛЬНОЕ значение

#apache-kafka #ksqldb #event-driven-design

Вопрос:

Путаюсь в событиях ksqlDB и материализованных представлениях. Я хочу получать уведомления в специальной теме / потоке, когда возникает новая высокая цена акций, как определено в сводном / материализованном представлении таблицы, но я получаю информацию о высокой цене для каждого события, а не только при появлении нового события с высокой ценой.

Вот мой рабочий пример / настройка.

Создайте базовый поток и тему для акций.

 ksql> create stream stocks (symbol VARCHAR KEY, company VARCHAR, price DECIMAL(9, 2))
>  WITH (KAFKA_TOPIC='stocks', PARTITIONS=1, VALUE_FORMAT='json');

Message
----------------
Stream created
----------------
 

добавьте некоторые исходные данные о ценах акций Acme Corp

 ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.11);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.12);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.13);
 

Распечатайте основную тему, чтобы доказать наличие данных.

 ksql> print 'stocks' from beginning limit 3;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:26:57.169 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.11}
rowtime: 2021/03/21 14:27:01.717 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.12}
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.13}
Topic printing ceased
 

Создайте агрегированное / материализованное представление, чтобы показать самые высокие цены на акции.

 ksql> create table stock_highs as
>  select symbol, max(price) as high
>  from stocks
>  group by symbol
>  emit changes;

 Message
--------------------------------------------
 Created query with ID CTAS_STOCK_HIGHS_115
--------------------------------------------
 

Запросите его для визуального осмотра.

 ksql> select * from stock_highs where symbol = 'ACME';
 -------------- ------------------------ 
|SYMBOL        |HIGH                    |
 -------------- ------------------------ 
|ACME          |111.13                  |
Query terminated
 

Используйте consumer (он же print STOCK_HIGHS) в отдельном терминале (терминал 2), чтобы следить за изменениями высокой цены акций.

 ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}
 

Back in original terminal (terminal 1) with ksql client insert more data to force the stock high to be updated.

 ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.20);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.15);
 

The above inserts should give a new high price of 111.20 and ignore the 111.15 price so, in my thinking and use case I’d want to get an event / message from the STOCK_HIGHS topic showing a new high stock price of 111.20 but not the 111.15 price. However, I get two new events in the consumer (terminal 2).

 ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}
rowtime: 2021/03/21 14:37:51.234 Z, key: 1094929733, value: {"HIGH":111.20}
rowtime: 2021/03/21 14:39:03.301 Z, key: 1094929733, value: {"HIGH":111.20}
 

Проблема в том, что я действительно хочу, чтобы меня уведомляли или «проверяли» только тогда, когда возникают новые высокие цены, таким образом, я могу заставить потребителя уйти и сделать что-то значимое, когда установлена новая высокая цена.