KStream join запускает функцию соединения мгновенно, как отложить ее в конце окна?

#java #join #spark-streaming #apache-kafka-streams

#java #Присоединиться #искровая передача #apache-kafka-streams

Вопрос:

Как объясняется во всеобъемлющей статье Пересечение потоков. Внешний KStream-KStream Join отправляет элемент, как только он поступает, даже до ожидания его соответствия в другом K-потоке. Недостатком этого является то, что оно дублирует событие not-joined вместе с каждым присоединенным событием.

Можете ли вы предложить какой-либо альтернативный способ реализации объединения событий без дублирования (как во внешнем соединении) или отсутствия (как во внутреннем соединении)?


Согласно тому же примеру событий просмотра по щелчку:

 KStream<String, JsonNode> joinedEventsStream = 
     clickEventsStream.outerJoin(viewEventsStream,
            (clickEvent, viewEvent) -> processJoin(clickEvent, viewEvent),/* Fire quickly if match found,*/
                                                                          /* else fire after 2 seconds */
            JoinWindows.of(Duration.ofSeconds(2L)), StreamJoined.with(Serdes.String(), jsonSerde, jsonSerde)
    );
  

Ожидаемые результаты объясняются ниже:

  • событие щелчка поступает через 1 секунду после просмотра событий, связанных (A, A)
  • событие щелчка поступает через 11 секунд после просмотра — разные события для каждого. Каждый через 2 секунды (размер окна) после его прибытия.(B, null) (null,B)
  • Событие просмотра приходит через 1 секунду после щелчка — Объединенные события (C, C)
  • есть событие просмотра, но нет события click — Not-joined через 2 секунды после его прибытия (D, null)
  • есть событие щелчка, но нет события просмотра, не присоединенного через 2 секунды после его прибытия (null, E)

ожидаемое внешнее соединение

Комментарии:

1. В последнем абзаце на этой странице говорится, что kafka streams не поддерживает это!

Ответ №1:

Atm (Kafka 2.7.0) поведение соответствует описанию в сообщении в блоге. Этот вопрос возникал уже несколько раз, и недавно мы создали тикет, чтобы изменить поведение:https://issues.apache.org/jira/browse/KAFKA-10847

В конце концов, вы могли бы использовать последующую операцию с отслеживанием состояния после объединения, чтобы буферизировать записи до тех пор, пока не будет достигнут конец окна (или, может быть, лучше, закрытие окна, т. Е. Конец окна плюс льготный период). Это позволяет вам отфильтровывать ложный результат соединения влево / наружу.