Массовая обработка данных с помощью соединения с потоками кафки приводит к «Пропуску записи для сегмента с истекшим сроком действия»

#java #apache-kafka #apache-kafka-streams

Вопрос:

При передаче массовых данных через приложение kafka-steams я много раз вижу, как оно регистрирует следующее сообщение…

WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment.

…и данные, которые, как я ожидаю, были объединены с помощью шага соединения слева, похоже, потеряны.

Я видел это на практике либо когда мое приложение было закрыто на некоторое время, а затем восстановлено, либо когда я использовал что-то вроде инструмента сброса приложения в попытке заставить приложение повторно обработать прошлые данные.

Я смог воспроизвести это поведение изолированно, создав 1000 сообщений на две темы, расположенные на расстоянии часа друг от друга (с исходными метками времени в порядке), затем попросив потоки кафки выбрать для них ключ и попытаться соединить два обновленных потока.

Автономный исходный код для этого воспроизведения доступен по адресу https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

Фактическая топология потоков кафки там выглядит так.

             final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> leftStream = builder.stream(leftTopic);
            final KStream<String, String> rightStream = builder.stream(rightTopic);

            final KStream<String, String> rekeyedLeftStream = leftStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            final KStream<String, String> rekeyedRightStream = rightStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

            final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
                    rekeyedRightStream,
                    (left, right) -> left   "/"   right,
                    joinWindow
            );
 

…и конечный результат, который я получаю, выглядит так…

 ...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
 

…где, учитывая входные данные, я ожидаю, что каждая строка будет заканчиваться соединением двух значений, а правильное значение будет равно нулю.

(Обратите внимание, что это нормально/ожидалось, что мы изначально получим значения left/null для каждого значения — такова ожидаемая семантика соединения потоков Кафки слева, как я это понимаю.)

Я заметил, что если я установлю очень большое значение grace в окне объединения, проблема будет решена, но, поскольку ввод, который я предоставляю, не вышел из строя, я не ожидал, что мне понадобится это делать, и я устал от требований к ресурсам, которые предъявляются на практике к приложению с большим объемом.

Я подозреваю, что происходит что-то такое, что при обработке одного раздела время потока переносится на самое новое сообщение в этом разделе, то есть, когда затем проверяется следующий раздел, оказывается, что он содержит много записей, которые «слишком стары» по сравнению со временем потока. Я надеюсь, однако, что кто-нибудь может указать мне на настройку для изменения этого поведения или какое-либо другое решение, которое позволит избежать получения неточных результатов всякий раз, когда приложение справляется с накоплением данных, не накладывая больших накладных расходов на производительность.

Комментарии:

1. ваше подозрение верно, но, похоже, это просто нормальное поведение JoinWindows, если я правильно понял вашу проблему, я опубликовал более подробный ответ об этом.

2. Поднял этот вопрос как issues.apache.org/jira/browse/KAFKA-13289 поскольку это, по-видимому, является основной проблемой в самих потоках кафки.

Ответ №1:

Вы публикуете сообщения в течение 1 часа, а затем он не присоединяется?
Прямо сейчас вы используете:

 JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
 

изменение этого числа на большее или добавление льготного периода позволит вам обрабатывать гораздо больше сообщений, при этом сообщения будут находиться с интервалом в 1 час, а у вас будет 1000 сообщений, поэтому значение:

 JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)).grace(Duration.ofDays(42));
 

Это необходимо. (так как 1000 * 5 часов-это около 42 дней)
Поэтому вам необходимо настроить это значение в соответствии с размером ваших данных, чтобы всегда иметь возможность выполнять эту операцию со всеми ожидаемыми сообщениями.

Тогда я получу результаты, которых вы ожидаете… Я так думаю. Поскольку я не совсем уверен в других нулях здесь, но вы, кажется, говорите, что это ожидаемо — не анализировали эту часть. Поскольку у некоторых людей это есть, у других этого нет.

 
 11 [11:left/null, 11:left/11:right]
 12 [12:left/12:right]
 13 [13:left/null, 13:left/13:right]
 14 [14:left/null, 14:left/14:right]
 15 [15:left/null, 15:left/15:right]
 16 [16:left/null, 16:left/16:right]
 17 [17:left/17:right]
 18 [18:left/null, 18:left/18:right]
 19 [19:left/null, 19:left/19:right]
 20 [20:left/null, 20:left/20:right]
 21 [21:left/null, 21:left/21:right]
 22 [22:left/null, 22:left/22:right]
 23 [23:left/null, 23:left/23:right]
 24 [24:left/null, 24:left/24:right]
 25 [25:left/25:right]
 26 [26:left/null, 26:left/26:right]
 

Но всегда есть допустимая пара, заданная для всех 1000 результатов.

Вам нужны более старые данные, поэтому вы должны согласиться на более старые данные.

но, как я понимаю, иметь очень большой льготный период было бы дорого

Вероятно, это будет дорого стоить, если ваш льготный период будет намного больше, чем вам нужно, но в данном случае это именно то, что вам нужно. Если вы не можете просто избежать этого объединения в целом.
как вы можете видеть в документации, благодать делает именно то, что вы хотите (или, скорее, то, что вы не хотите, с низким значением по умолчанию): https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#grace-java.time.Продолжительность-

Отклоняйте поздние события, которые приходят позже, чем после окончания окна, после окончания его окна. Задержка определяется как (stream_time — метка времени записи).

Альтенативное решение состоит в том, чтобы использовать большее окно, но это не похоже на правильное решение для вашего случая:

 JoinWindows joinWindow = JoinWindows.of(Duration.ofDays(42);
 

Комментарии:

1. Спасибо — Я думаю, я ищу решение, которое не «устанавливает массивное окно соединения/благодати», а скорее использует преимущества упорядоченного ввода, поскольку приложение имеет большой объем и в основном работает в непрерывном потоковом режиме. Я подумал, что, если я смогу программно каким-то образом сократить отставание промежуточных тем, возможно, у меня получится что-то, что будет пропускать ввод со скоростью, которую приложение может надежно обрабатывать без огромного окна.

2. @MattSheppard это немного похоже на предварительную оптимизацию, вы уверены, что вам нужно что-то еще? У вас просто очень разнесенные данные, обычно их много событий в минуту, а не 1 в час, поэтому окно присоединения в 40 дней с 20 событиями для minuite наверняка будет тяжелым. Но здесь? Я бы просто использовал его и измерил, если возникнут какие-либо проблемы. И тогда я бы также задал другой вопрос в стеке, о другом способе обработки данных, а не о том, чтобы исправить соединение.

3. На практике в моем приложении происходят тысячи событий в секунду, и эта же проблема возникает, если я выключаю приложение на час, а затем снова включаю его, когда оно пытается наверстать упущенное — пример здесь просто раскрывает проблему, которую я вижу простым способом, без усложнения остальных приложений. Боюсь, что иметь окно благодати такого размера, чтобы как можно дольше отключать приложение, непрактично. Но похоже, что настоящий ответ заключается в том, что потоки Кафки на самом деле не подходят для моей цели в настоящее время. Жаль, но ладно.

4. @MattSheppard Не уверен, есть ли лучшее решение от кафки, но наверняка есть запасной экземпляр или просто что-то, чтобы увеличить льготный период только тогда, когда приложение начнет покрывать пропущенные части