#apache-kafka #ksqldb
#apache-kafka #ksqldb
Вопрос:
Запуск версии KSQLDB: 0.12.0
У меня проблема с объединенными потоками.
При создании потока в качестве оконного запроса внутреннего соединения все поля правой стороны равны нулю, даже те, которые являются частью условия соединения. При независимом выполнении одного и того же запроса я обычно получаю поля.
Итак, вот настройка:
Запуск сервера с помощью docker-compose:
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.12.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: mybroker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'
Определения потоков и соединений
CREATE STREAM generic_orders (
...
clientrequestid_ string KEY,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
При соединении двух вышеуказанных потоков это не сработало, я также попытался создать request_clientid_
ключ.
Я подозревал, что это не сработает из-за различий в типах clientrequestid_
и request_clientid_
, поэтому я создал другой поток:
CREATE STREAM specific_order_typed AS
select originatoruserid_ ,
CAST(request_clientid_ AS string) as request_clientid_ KEY,
bidquoteinfo_volume_
FROM ice_massquote_order
EMIT CHANGES
;
Это не помогло…
Вот объединенный поток:
CREATE STREAM enriched_orders AS
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
Также пытался поменять местами поток from и поток join … Результаты всегда нулевые:
|623562762 |null
...
При непосредственном выполнении запроса я получаю то, что ожидал
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
|623562762 |623562762
У кого-нибудь есть представление о том, что происходит? Я исчерпал всю идею, которая у меня была
Комментарии:
1. Являются
thetopic
ли фактические ключи сообщения строкой, которая содержитclientrequestid_
? ЕслиPRINT thetopic
вы должны увидеть ключевые значения.
Ответ №1:
Поэтому я решил свою проблему, понизив рейтинг до 0.10.2.
Первое отличие, которое я заметил, это то, что при попытке присоединиться к исходным потокам generic_orders
и specific_order
я получаю правильную ошибку для сравнения между string
и bigint
CREATE STREAM generic_orders (
...
clientrequestid_ string,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
Затем мне просто нужно было преобразовать bigint в строку (я уже пробовал это с 0.12).:
CREATE STREAM enriched_orders AS
SELECT * FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = CAST(i.request_clientid_ AS String)
EMIT CHANGES;
И, наконец, значения больше не равны нулю при чтении этого потока.