#java #apache-kafka #apache-kafka-streams
Вопрос:
При передаче массовых данных через приложение kafka-steams я много раз вижу, как оно регистрирует следующее сообщение…
WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment.
…и данные, которые, как я ожидаю, были объединены с помощью шага соединения слева, похоже, потеряны.
Я видел это на практике либо когда мое приложение было закрыто на некоторое время, а затем восстановлено, либо когда я использовал что-то вроде инструмента сброса приложения в попытке заставить приложение повторно обработать прошлые данные.
Я смог воспроизвести это поведение изолированно, создав 1000 сообщений на две темы, расположенные на расстоянии часа друг от друга (с исходными метками времени в порядке), затем попросив потоки кафки выбрать для них ключ и попытаться соединить два обновленных потока.
Автономный исходный код для этого воспроизведения доступен по адресу https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
Фактическая топология потоков кафки там выглядит так.
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> leftStream = builder.stream(leftTopic);
final KStream<String, String> rightStream = builder.stream(rightTopic);
final KStream<String, String> rekeyedLeftStream = leftStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));
final KStream<String, String> rekeyedRightStream = rightStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));
JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
rekeyedRightStream,
(left, right) -> left "/" right,
joinWindow
);
…и конечный результат, который я получаю, выглядит так…
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
…где, учитывая входные данные, я ожидаю, что каждая строка будет заканчиваться соединением двух значений, а правильное значение будет равно нулю.
(Обратите внимание, что это нормально/ожидалось, что мы изначально получим значения left/null для каждого значения — такова ожидаемая семантика соединения потоков Кафки слева, как я это понимаю.)
Я заметил, что если я установлю очень большое значение grace в окне объединения, проблема будет решена, но, поскольку ввод, который я предоставляю, не вышел из строя, я не ожидал, что мне понадобится это делать, и я устал от требований к ресурсам, которые предъявляются на практике к приложению с большим объемом.
Я подозреваю, что происходит что-то такое, что при обработке одного раздела время потока переносится на самое новое сообщение в этом разделе, то есть, когда затем проверяется следующий раздел, оказывается, что он содержит много записей, которые «слишком стары» по сравнению со временем потока. Я надеюсь, однако, что кто-нибудь может указать мне на настройку для изменения этого поведения или какое-либо другое решение, которое позволит избежать получения неточных результатов всякий раз, когда приложение справляется с накоплением данных, не накладывая больших накладных расходов на производительность.
Комментарии:
1. ваше подозрение верно, но, похоже, это просто нормальное поведение JoinWindows, если я правильно понял вашу проблему, я опубликовал более подробный ответ об этом.
2. Поднял этот вопрос как issues.apache.org/jira/browse/KAFKA-13289 поскольку это, по-видимому, является основной проблемой в самих потоках кафки.
Ответ №1:
Вы публикуете сообщения в течение 1 часа, а затем он не присоединяется?
Прямо сейчас вы используете:
JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
изменение этого числа на большее или добавление льготного периода позволит вам обрабатывать гораздо больше сообщений, при этом сообщения будут находиться с интервалом в 1 час, а у вас будет 1000 сообщений, поэтому значение:
JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)).grace(Duration.ofDays(42));
Это необходимо. (так как 1000 * 5 часов-это около 42 дней)
Поэтому вам необходимо настроить это значение в соответствии с размером ваших данных, чтобы всегда иметь возможность выполнять эту операцию со всеми ожидаемыми сообщениями.
Тогда я получу результаты, которых вы ожидаете… Я так думаю. Поскольку я не совсем уверен в других нулях здесь, но вы, кажется, говорите, что это ожидаемо — не анализировали эту часть. Поскольку у некоторых людей это есть, у других этого нет.
11 [11:left/null, 11:left/11:right]
12 [12:left/12:right]
13 [13:left/null, 13:left/13:right]
14 [14:left/null, 14:left/14:right]
15 [15:left/null, 15:left/15:right]
16 [16:left/null, 16:left/16:right]
17 [17:left/17:right]
18 [18:left/null, 18:left/18:right]
19 [19:left/null, 19:left/19:right]
20 [20:left/null, 20:left/20:right]
21 [21:left/null, 21:left/21:right]
22 [22:left/null, 22:left/22:right]
23 [23:left/null, 23:left/23:right]
24 [24:left/null, 24:left/24:right]
25 [25:left/25:right]
26 [26:left/null, 26:left/26:right]
Но всегда есть допустимая пара, заданная для всех 1000 результатов.
Вам нужны более старые данные, поэтому вы должны согласиться на более старые данные.
но, как я понимаю, иметь очень большой льготный период было бы дорого
Вероятно, это будет дорого стоить, если ваш льготный период будет намного больше, чем вам нужно, но в данном случае это именно то, что вам нужно. Если вы не можете просто избежать этого объединения в целом.
как вы можете видеть в документации, благодать делает именно то, что вы хотите (или, скорее, то, что вы не хотите, с низким значением по умолчанию): https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#grace-java.time.Продолжительность-
Отклоняйте поздние события, которые приходят позже, чем после окончания окна, после окончания его окна. Задержка определяется как (stream_time — метка времени записи).
Альтенативное решение состоит в том, чтобы использовать большее окно, но это не похоже на правильное решение для вашего случая:
JoinWindows joinWindow = JoinWindows.of(Duration.ofDays(42);
Комментарии:
1. Спасибо — Я думаю, я ищу решение, которое не «устанавливает массивное окно соединения/благодати», а скорее использует преимущества упорядоченного ввода, поскольку приложение имеет большой объем и в основном работает в непрерывном потоковом режиме. Я подумал, что, если я смогу программно каким-то образом сократить отставание промежуточных тем, возможно, у меня получится что-то, что будет пропускать ввод со скоростью, которую приложение может надежно обрабатывать без огромного окна.
2. @MattSheppard это немного похоже на предварительную оптимизацию, вы уверены, что вам нужно что-то еще? У вас просто очень разнесенные данные, обычно их много событий в минуту, а не 1 в час, поэтому окно присоединения в 40 дней с 20 событиями для minuite наверняка будет тяжелым. Но здесь? Я бы просто использовал его и измерил, если возникнут какие-либо проблемы. И тогда я бы также задал другой вопрос в стеке, о другом способе обработки данных, а не о том, чтобы исправить соединение.
3. На практике в моем приложении происходят тысячи событий в секунду, и эта же проблема возникает, если я выключаю приложение на час, а затем снова включаю его, когда оно пытается наверстать упущенное — пример здесь просто раскрывает проблему, которую я вижу простым способом, без усложнения остальных приложений. Боюсь, что иметь окно благодати такого размера, чтобы как можно дольше отключать приложение, непрактично. Но похоже, что настоящий ответ заключается в том, что потоки Кафки на самом деле не подходят для моей цели в настоящее время. Жаль, но ладно.
4. @MattSheppard Не уверен, есть ли лучшее решение от кафки, но наверняка есть запасной экземпляр или просто что-то, чтобы увеличить льготный период только тогда, когда приложение начнет покрывать пропущенные части