Нужно отфильтровать записи Kafka на основе определенного ключевого слова

#apache-kafka #ksqldb #lenses

#apache-kafka #ksqldb #линзы

Вопрос:

У меня есть тема Kafka, в которой содержится около 3 миллионов записей. Я хочу выбрать из этого единственную запись, которая имеет определенный параметр. Я пытался запросить это с помощью Lenses, но не смог сформировать правильный запрос. ниже приведено содержимое записи 1 сообщения.

 {
  "header": {
    "schemaVersionNo": "1",
  },
  "payload": {
    "modifiedDate": 1552334325212,
    "createdDate": 1552334325212,
    "createdBy": "A",
    "successful": true,
    "source_order_id": "1111111111111",
  }
}
  

Теперь я хочу отфильтровать запись с определенным source_order_id, но не могу найти правильный способ сделать это.
Мы также пробовали использовать lenses как инструмент Kafka.

Ниже приведен пример запроса, который мы опробовали в lenses:

 SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='A'
  

Этот запрос работает, однако, если мы попытаемся использовать идентификатор источника, как показано ниже, мы получим ошибку:

 SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='1111111111111'



 Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.
  

Использование всех 3 миллионов записей через пользовательского пользователя и последующее повторение этого не кажется мне оптимизированным подходом, поэтому ищу любые доступные решения для такого варианта использования.

Комментарии:

1. Что Apacha Lenses ? Вы имеете в виду Lenses и их язык Lenses SQL?

2. Обязательно ли решение, которое вы создаете, должно основываться на линзах? Я могу дать вам ответ на основе KSQL, если это вам полезно.

3. @RobinMoffatt: docs.lenses.io/overview/lenses-kafka.html

4. @RobinMoffatt: Не обязательно, подойдет любое решение.

Ответ №1:

Поскольку вы сказали, что открыты для других решений, вот одно, созданное с использованием KSQL.

Сначала давайте добавим несколько примеров записей в исходный раздел:

 $ kafkacat -P -b localhost:9092 -t TEST <<EOF
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325212, "createdDate": 1552334325212, "createdBy": "A", "successful": true, "source_order_id": "3411976933214" } }
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325412, "createdDate": 1552334325412, "createdBy": "B", "successful": true, "source_order_id": "3411976933215" } }
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325612, "createdDate": 1552334325612, "createdBy": "C", "successful": true, "source_order_id": "3411976933216" } }
EOF
  

Используя KSQL, мы можем проверить тему с помощью PRINT :

 ksql> PRINT 'TEST' FROM BEGINNING;
Format:JSON
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325212,"createdDate":1552334325212,"createdBy":"A","successful":true,"source_order_id":"3411976933214"}}
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325412,"createdDate":1552334325412,"createdBy":"B","successful":true,"source_order_id":"3411976933215"}}
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325612,"createdDate":1552334325612,"createdBy":"C","successful":true,"source_order_id":"3411976933216"}}
  

Затем объявите схему по теме, которая позволяет нам запускать SQL против нее:

 ksql> CREATE STREAM TEST (header STRUCT<schemaVersionNo VARCHAR>, 
                          payload STRUCT<modifiedDate BIGINT, 
                                        createdDate BIGINT, 
                                        createdBy VARCHAR, 
                                        successful BOOLEAN, 
                                        source_order_id VARCHAR>) 
                          WITH (KAFKA_TOPIC='TEST', 
                                VALUE_FORMAT='JSON');

Message
----------------
Stream created
----------------
  

Скажите KSQL, чтобы он работал со всеми данными в теме:

 ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
  

И теперь мы можем выбрать все данные:

 ksql> SELECT * FROM TEST;
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325412, CREATEDDATE=1552334325412, CREATEDBY=B, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933215}
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
^CQuery terminated
  

или мы можем выборочно запросить его, используя -> обозначение для доступа к вложенным полям в схеме:

 ksql> SELECT * FROM TEST 
        WHERE PAYLOAD->CREATEDBY='A';
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
  

Помимо выбора всех записей, вы можете вернуть только интересующие поля:

 ksql> SELECT payload FROM TEST 
        WHERE PAYLOAD->source_order_id='3411976933216';
{MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
  

С помощью KSQL вы можете записать результаты любого SELECT оператора в новую тему, которая заполняет ее всеми существующими сообщениями вместе с каждым новым сообщением в исходной теме, отфильтрованным и обработанным в соответствии с объявленным SELECT оператором:

 ksql> CREATE STREAM TEST_CREATED_BY_A AS
        SELECT * FROM TEST WHERE PAYLOAD->CREATEDBY='A';

Message
----------------------------
Stream created and running
----------------------------
  

Раздел списка в кластере Kafka:

 ksql> SHOW TOPICS;

Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
orders                 | true       | 1          | 1                  | 1         | 1
pageviews              | false      | 1          | 1                  | 0         | 0
products               | true       | 1          | 1                  | 1         | 1
TEST                   | true       | 1          | 1                  | 1         | 1
TEST_CREATED_BY_A      | true       | 4          | 1                  | 0         | 0
  

Распечатайте содержимое нового раздела:

 ksql> PRINT 'TEST_CREATED_BY_A' FROM BEGINNING;
Format:JSON
{"ROWTIME":1552475910106,"ROWKEY":"null","HEADER":{"SCHEMAVERSIONNO":"1"},"PAYLOAD":{"MODIFIEDDATE":1552334325212,"CREATEDDATE":1552334325212,"CREATEDBY":"A","SUCCESSFUL":true,"SOURCE_ORDER_ID":"3411976933214"}}