#join #struct #apache-kafka #ksqldb #debezium
Вопрос:
Мне нужно создать тему Кафки из комбинации девяти других тем, все они созданы Debezium PostgreSQL source connector в формате AVRO. Для начала я пытаюсь (пока безуспешно) объединить поля только из двух тем.
Итак, сначала создайте таблицу ksqlDB на основе темы «ЗАПРОС» :
ksql> CREATE TABLE TB_REQUEST (ID STRUCT<REQUEST_ID BIGINT> PRIMARY KEY)
WITH (KAFKA_TOPIC='REQUEST', FORMAT='AVRO');
И мне все кажется прекрасным:
ksql> DESCRIBE TB_REQUEST;
Name : TB_REQUEST
Field | Type
-----------------------------------------------------------------------------------------------------------------------
ID | STRUCT<REQUEST_ID BIGINT> (primary key)
BEFORE | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
AFTER | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
SOURCE | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP | VARCHAR(STRING)
TS_MS | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
Затем я создаю другую таблицу из темы «СОТРУДНИК» :
ksql> CREATE TABLE TB_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> PRIMARY KEY)
WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');
Опять же, все кажется в порядке.
ksql> DESCRIBE TB_EMPLOYEE;
Name : TB_EMPLOYEE
Field | Type
-----------------------------------------------------------------------------------------------------------------------
ID | STRUCT<EMPLOYEE_ID INTEGER> (primary key)
BEFORE | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
AFTER | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
SOURCE | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP | VARCHAR(STRING)
TS_MS | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
Но, попытавшись создать свою целевую таблицу, присоединившись к предыдущим по идентификатору сотрудника.
ksql> CREATE TABLE REQUEST_EMPLOYEE AS
SELECT RQ.ID->REQUEST_ID, RQ.AFTER->REQUESTER_ID, RQ.AFTER->STATUS_ID, EM.ID->EMPLOYEE_ID, EM.AFTER->NAME AS REQUESTER
FROM TB_REQUEST RQ
JOIN TB_EMPLOYEE EM ON RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID;
Я получил следующую ошибку:
Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID.
Statement: CREATE TABLE REQUEST_EMPLOYEE WITH (KAFKA_TOPIC='REQUEST_EMPLOYEE', PARTITIONS=1, REPLICAS=1) AS SELECT
RQ.ID->REQUEST_ID REQUEST_ID,
RQ.AFTER->REQUESTER_ID REQUESTER_ID,
RQ.AFTER->STATUS_ID STATUS_ID,
EM.ID->EMPLOYEE_ID EMPLOYEE_ID,
EM.AFTER->NAME REQUESTER
FROM TB_REQUEST RQ
INNER JOIN TB_EMPLOYEE EM ON ((RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID))
EMIT CHANGES;
Глядя на вывод команды «ОПИСАТЬ TB_EMPLOYEE», мне кажется, что «EM.ID->ИДЕНТИФИКАТОР СОТРУДНИКА» — это правильный выбор. Что я упускаю?
Заранее спасибо.
PS: версия ksqlDB в версии 0.21.0
Ответ №1:
Я думаю, что вы должны использовать по крайней мере один ключ строки в инструкции join, в предыдущих версиях KsqlDB единственным способом объединения таблиц были клавиши строк, в вашей текущей версии 0.21.0 это возможно с помощью внешнего ключа.
Проверьте следующий пример:
CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.u_id EMIT CHANGES;
Где u_id определяется как первичный ключ, таким образом, является ключом строки.
CREATE TABLE users (
u_id VARCHAR PRIMARY KEY
name VARCHAR
) WITH (
kafka_topic = 'users',
partitions = 3,
value_format = 'json'
);
Приведенное ниже предложение аналогично
CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.ROWKEY EMIT CHANGES;
Еще одно наблюдение: KsqlDB рассматривает ключ для вашего TB_EMPLOYE как СТРУКТУРУ<ЦЕЛОЕ ЧИСЛО ИДЕНТИФИКАТОРА СОТРУДНИКА>, а не только целое число.
затем ждет одного сравнения между структурами. (С той же схемой)
Затем вы можете выполнить следующие действия, прежде чем создавать свою таблицу.
CREATE STREAM STREAM_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> KEY)
WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');
CREATE STREAM STREAM_REKEY_EMPLOYEE
AS SELECT ID->EMPLOYEE_ID employee_id, * FROM STREAM_EMPLOYEE
PARTITION BY ID->EMPLOYEE_ID
EMIT CHANGES;
CREATE TABLE TB_EMPLOYEE (employee_id PRIMARY KEY)
WITH (KAFKA_TOPIC='STREAM_REKEY_EMPLOYEE', FORMAT='AVRO');
И используйте поле employee_id для присоединения, попробуйте использовать свои первичные ключи в качестве примитивных типов.
Комментарии:
1. Привет, Фелипе, спасибо за ответ. Ошибка была вызвана попыткой объединения с помощью структур. Ваше предложение о создании потока для выравнивания первичного ключа (СОЗДАТЬ ПОТОК STREAM_REKEY_EMPLOYEE …) сделало свое дело. Я создал два потока «рекейд», по одному для каждого источника, создал таблицы на основе этих потоков и смог создать объединенную таблицу, как и предполагалось изначально.
2. Именно структура ключей была проблемой, если вы считаете, что ответ был хорошим, пожалуйста, оцените ответ как полезный 🙂 , спасибо.
3. Я сделал. Но на момент написания этой статьи у меня нет репутации, чтобы отдать свой голос. 🙁