Правая сторона KSQLDB внутренних объединенных потоков всегда равна нулю

#apache-kafka #ksqldb

#apache-kafka #ksqldb

Вопрос:

Запуск версии KSQLDB: 0.12.0

У меня проблема с объединенными потоками.

При создании потока в качестве оконного запроса внутреннего соединения все поля правой стороны равны нулю, даже те, которые являются частью условия соединения. При независимом выполнении одного и того же запроса я обычно получаю поля.

Итак, вот настройка:

Запуск сервера с помощью docker-compose:

 services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.12.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: mybroker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
      KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'
  

Определения потоков и соединений

 CREATE STREAM generic_orders (
    ...
    clientrequestid_ string KEY,
    ...
)   WITH (
    KAFKA_TOPIC='thetopic',
    VALUE_FORMAT='AVRO'
);

CREATE STREAM specific_order (
    originatoruserid_ string,
    request_clientid_ bigint,
    bidquoteinfo_volume_ bigint
)   WITH (
    KAFKA_TOPIC='anothertopic',
    VALUE_FORMAT='AVRO'
);
  

При соединении двух вышеуказанных потоков это не сработало, я также попытался создать request_clientid_ ключ.
Я подозревал, что это не сработает из-за различий в типах clientrequestid_ и request_clientid_ , поэтому я создал другой поток:

 CREATE STREAM specific_order_typed AS
    select originatoruserid_ ,
    CAST(request_clientid_ AS string) as request_clientid_ KEY, 
    bidquoteinfo_volume_ 
    FROM ice_massquote_order
    EMIT CHANGES
    ;
  

Это не помогло…

Вот объединенный поток:

 CREATE STREAM enriched_orders AS
    SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq  FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_ 
    EMIT CHANGES;
  

Также пытался поменять местами поток from и поток join … Результаты всегда нулевые:

 |623562762 |null
...
  

При непосредственном выполнении запроса я получаю то, что ожидал

     SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq  FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_ 
    EMIT CHANGES;
  
 |623562762 |623562762
  

У кого-нибудь есть представление о том, что происходит? Я исчерпал всю идею, которая у меня была

Комментарии:

1. Являются thetopic ли фактические ключи сообщения строкой, которая содержит clientrequestid_ ? Если PRINT thetopic вы должны увидеть ключевые значения.

Ответ №1:

Поэтому я решил свою проблему, понизив рейтинг до 0.10.2.

Первое отличие, которое я заметил, это то, что при попытке присоединиться к исходным потокам generic_orders и specific_order я получаю правильную ошибку для сравнения между string и bigint

 CREATE STREAM generic_orders (
    ...
    clientrequestid_ string,
    ...
)   WITH (
    KAFKA_TOPIC='thetopic',
    VALUE_FORMAT='AVRO'
);

CREATE STREAM specific_order (
    originatoruserid_ string,
    request_clientid_ bigint,
    bidquoteinfo_volume_ bigint
)   WITH (
    KAFKA_TOPIC='anothertopic',
    VALUE_FORMAT='AVRO'
);
  

Затем мне просто нужно было преобразовать bigint в строку (я уже пробовал это с 0.12).:

 CREATE STREAM enriched_orders AS
    SELECT * FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = CAST(i.request_clientid_ AS String)
    EMIT CHANGES;
  

И, наконец, значения больше не равны нулю при чтении этого потока.