Ошибка «Недопустимое условие соединения: для соединения таблиц требуется соединение по первичному ключу правой входной таблицы» при соединении двух таблиц в Kafka ksqlDB

#join #struct #apache-kafka #ksqldb #debezium

Вопрос:

Мне нужно создать тему Кафки из комбинации девяти других тем, все они созданы Debezium PostgreSQL source connector в формате AVRO. Для начала я пытаюсь (пока безуспешно) объединить поля только из двух тем.

Итак, сначала создайте таблицу ksqlDB на основе темы «ЗАПРОС» :

 ksql> CREATE TABLE TB_REQUEST (ID STRUCT<REQUEST_ID BIGINT> PRIMARY KEY)
         WITH (KAFKA_TOPIC='REQUEST', FORMAT='AVRO');
 

И мне все кажется прекрасным:

 ksql> DESCRIBE TB_REQUEST;

Name                 : TB_REQUEST
 Field       | Type
-----------------------------------------------------------------------------------------------------------------------

 ID          | STRUCT<REQUEST_ID BIGINT> (primary key)

 BEFORE      | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>

 AFTER       | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
 
 SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>

 OP          | VARCHAR(STRING)

 TS_MS       | BIGINT

 TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>

-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
 

Затем я создаю другую таблицу из темы «СОТРУДНИК» :

 ksql> CREATE TABLE TB_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> PRIMARY KEY)
         WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');
 

Опять же, все кажется в порядке.

 ksql> DESCRIBE TB_EMPLOYEE;

Name                 : TB_EMPLOYEE
 Field       | Type                                                       
-----------------------------------------------------------------------------------------------------------------------
 ID          | STRUCT<EMPLOYEE_ID INTEGER> (primary key)

 BEFORE      | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
 
 AFTER       | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
 
 SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>

 OP          | VARCHAR(STRING)

 TS_MS       | BIGINT

 TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>

-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
 

Но, попытавшись создать свою целевую таблицу, присоединившись к предыдущим по идентификатору сотрудника.

 ksql> CREATE TABLE REQUEST_EMPLOYEE AS 
         SELECT RQ.ID->REQUEST_ID, RQ.AFTER->REQUESTER_ID, RQ.AFTER->STATUS_ID, EM.ID->EMPLOYEE_ID, EM.AFTER->NAME AS REQUESTER
         FROM TB_REQUEST RQ
         JOIN TB_EMPLOYEE EM ON RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID;
 

Я получил следующую ошибку:

 Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID.
Statement: CREATE TABLE REQUEST_EMPLOYEE WITH (KAFKA_TOPIC='REQUEST_EMPLOYEE', PARTITIONS=1, REPLICAS=1) AS SELECT
  RQ.ID->REQUEST_ID REQUEST_ID,
  RQ.AFTER->REQUESTER_ID REQUESTER_ID,
  RQ.AFTER->STATUS_ID STATUS_ID,
  EM.ID->EMPLOYEE_ID EMPLOYEE_ID,
  EM.AFTER->NAME REQUESTER
FROM TB_REQUEST RQ
INNER JOIN TB_EMPLOYEE EM ON ((RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID))
EMIT CHANGES;
 

Глядя на вывод команды «ОПИСАТЬ TB_EMPLOYEE», мне кажется, что «EM.ID->ИДЕНТИФИКАТОР СОТРУДНИКА» — это правильный выбор. Что я упускаю?

Заранее спасибо.

PS: версия ksqlDB в версии 0.21.0

Ответ №1:

Я думаю, что вы должны использовать по крайней мере один ключ строки в инструкции join, в предыдущих версиях KsqlDB единственным способом объединения таблиц были клавиши строк, в вашей текущей версии 0.21.0 это возможно с помощью внешнего ключа.

Проверьте следующий пример:

 CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.u_id EMIT CHANGES;
 

Где u_id определяется как первичный ключ, таким образом, является ключом строки.

 CREATE TABLE users (
    u_id VARCHAR PRIMARY KEY
    name VARCHAR
  ) WITH (
    kafka_topic = 'users',
    partitions = 3,
    value_format = 'json'
  );
 

Приведенное ниже предложение аналогично

 CREATE TABLE orders_with_users AS
    SELECT * FROM orders JOIN users ON orders.u_id = users.ROWKEY EMIT CHANGES;
 

Еще одно наблюдение: KsqlDB рассматривает ключ для вашего TB_EMPLOYE как СТРУКТУРУ<ЦЕЛОЕ ЧИСЛО ИДЕНТИФИКАТОРА СОТРУДНИКА>, а не только целое число.
затем ждет одного сравнения между структурами. (С той же схемой)

Затем вы можете выполнить следующие действия, прежде чем создавать свою таблицу.

 CREATE STREAM STREAM_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> KEY)
         WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');

CREATE STREAM STREAM_REKEY_EMPLOYEE 
AS SELECT ID->EMPLOYEE_ID employee_id, * FROM STREAM_EMPLOYEE
PARTITION BY ID->EMPLOYEE_ID
EMIT CHANGES;


CREATE TABLE TB_EMPLOYEE (employee_id PRIMARY KEY)
         WITH (KAFKA_TOPIC='STREAM_REKEY_EMPLOYEE', FORMAT='AVRO');
 

И используйте поле employee_id для присоединения, попробуйте использовать свои первичные ключи в качестве примитивных типов.

Комментарии:

1. Привет, Фелипе, спасибо за ответ. Ошибка была вызвана попыткой объединения с помощью структур. Ваше предложение о создании потока для выравнивания первичного ключа (СОЗДАТЬ ПОТОК STREAM_REKEY_EMPLOYEE …) сделало свое дело. Я создал два потока «рекейд», по одному для каждого источника, создал таблицы на основе этих потоков и смог создать объединенную таблицу, как и предполагалось изначально.

2. Именно структура ключей была проблемой, если вы считаете, что ответ был хорошим, пожалуйста, оцените ответ как полезный 🙂 , спасибо.

3. Я сделал. Но на момент написания этой статьи у меня нет репутации, чтобы отдать свой голос. 🙁