Можно ли восстановить хранилище состояний потоков Кафки после перезапуска без использования разделов списка изменений?

#java #apache-kafka #apache-kafka-streams #spring-kafka #spring-cloud-stream

Вопрос:

У нас есть 2 сжатые темы, каждая из которых содержит терабайты данных, которые мы хотим объединить с помощью Spring Cloud Stream и потоков Кафки. (Упрощенный) код выглядит следующим образом:

 @Bean
public BiConsumer<KTable<String, LeftEvent>, KTable<String, RightEvent>> processEvents() {
    return ((leftEvents, rightEvents) -> {
      leftEvents.join(rightEvents, this::merge)
        .toStream()
        .foreach(this::process);
    });
}
 

Проблема с этим подходом заключается в том, что использование KTable s в качестве входных параметров приводит к созданию разделов списка изменений, которые по существу дублируют исходные темы, поскольку, как упоминалось выше, обе эти темы уже уплотнены. Чтобы избежать дублирования терабайт данных в Кафке, нашей первой попыткой было вместо этого использовать KStream s в качестве входных данных и преобразовать их в KTable s следующим образом:

 stream.toTable(
  Materialized
    .<K, V, KeyValueStore<Bytes, byte[]>>as(stateStoreName)
    .withLoggingDisabled()
);
 

тем самым отключив ведение журнала и, следовательно, отказавшись от тем журнала изменений, которые в нашем контексте кажутся бесполезными.

Однако следующий сценарий теперь больше не работает:

  1. Сгенерируйте LeftEvent с помощью ключа k1
  2. Перезапустите приложение
  3. Сгенерируйте RightEvent с помощью ключа k1

События больше не объединяются, хотя объединение работает нормально, если приложение не перезапускается в промежутке (т. Е. шаг 1, затем 3).

Когда приложение перезапускается, мы ожидали бы, что хранилища состояний будут восстановлены из разделов souce в отсутствие разделов списка изменений, но, по-видимому, это не так. В некоторых случаях мы заметили, что файлы RocksDB (расположенные в /tmp/kafka-streams/... ) использовались для извлечения данных, использованных до перезапуска, однако мы не можем предположить, что эти файлы все еще будут доступны после перезапуска, поскольку мы работаем в контейнерной среде.

Есть ли способ поддерживать перезапуски (и добиться отказоустойчивости) без необходимости использовать разделы журнала изменений, которые в нашем случае дублируют разделы ввода? Если нет, нам, возможно, придется пересмотреть наше использование потоков Кафки…

Ответ №1:

Вы хотите включить оптимизацию потоков Кафки: https://docs.confluent.io/platform/current/streams/developer-guide/optimizing-streams.html#optimization-details (№1-это то, что вы ищете).

В настоящее время существует две оптимизации, которые потоки Кафки выполняют при включении:

  1. Исходная таблица KTable повторно использует исходный раздел в качестве раздела списка изменений.
  2. Когда это возможно, потоки Кафки сворачивают несколько тем перераспределения в одну тему перераспределения.

Главное, на что следует обратить внимание, поскольку я сам допустил эту ошибку, — не забудьте отправить конфигурацию как в сборку (), так и в построение KStreams (оптимизация, как указано здесь по предоставленной ссылке) выполняется в сборке.

 // tell Kafka Streams to optimize the topology
config.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

// Since we've configured Streams to use optimizations, the topology is optimized during the build.
// And because optimizations are enabled, the resulting topology will no longer need to perform
// three explicit repartitioning steps, but only one.
final Topology topology = builder.build(config);
final KafkaStreams streams = new KafkaStreams(topology, config);
 

Теперь оптимизация включена для всей топологии, поэтому, если вы помните об этом, оптимизация № 2 также выполняется.