#java #apache-kafka #apache-kafka-streams #spring-kafka #spring-cloud-stream
Вопрос:
У нас есть 2 сжатые темы, каждая из которых содержит терабайты данных, которые мы хотим объединить с помощью Spring Cloud Stream и потоков Кафки. (Упрощенный) код выглядит следующим образом:
@Bean
public BiConsumer<KTable<String, LeftEvent>, KTable<String, RightEvent>> processEvents() {
return ((leftEvents, rightEvents) -> {
leftEvents.join(rightEvents, this::merge)
.toStream()
.foreach(this::process);
});
}
Проблема с этим подходом заключается в том, что использование KTable
s в качестве входных параметров приводит к созданию разделов списка изменений, которые по существу дублируют исходные темы, поскольку, как упоминалось выше, обе эти темы уже уплотнены. Чтобы избежать дублирования терабайт данных в Кафке, нашей первой попыткой было вместо этого использовать KStream
s в качестве входных данных и преобразовать их в KTable
s следующим образом:
stream.toTable(
Materialized
.<K, V, KeyValueStore<Bytes, byte[]>>as(stateStoreName)
.withLoggingDisabled()
);
тем самым отключив ведение журнала и, следовательно, отказавшись от тем журнала изменений, которые в нашем контексте кажутся бесполезными.
Однако следующий сценарий теперь больше не работает:
- Сгенерируйте
LeftEvent
с помощью ключаk1
- Перезапустите приложение
- Сгенерируйте
RightEvent
с помощью ключаk1
События больше не объединяются, хотя объединение работает нормально, если приложение не перезапускается в промежутке (т. Е. шаг 1, затем 3).
Когда приложение перезапускается, мы ожидали бы, что хранилища состояний будут восстановлены из разделов souce в отсутствие разделов списка изменений, но, по-видимому, это не так. В некоторых случаях мы заметили, что файлы RocksDB (расположенные в /tmp/kafka-streams/...
) использовались для извлечения данных, использованных до перезапуска, однако мы не можем предположить, что эти файлы все еще будут доступны после перезапуска, поскольку мы работаем в контейнерной среде.
Есть ли способ поддерживать перезапуски (и добиться отказоустойчивости) без необходимости использовать разделы журнала изменений, которые в нашем случае дублируют разделы ввода? Если нет, нам, возможно, придется пересмотреть наше использование потоков Кафки…
Ответ №1:
Вы хотите включить оптимизацию потоков Кафки: https://docs.confluent.io/platform/current/streams/developer-guide/optimizing-streams.html#optimization-details (№1-это то, что вы ищете).
В настоящее время существует две оптимизации, которые потоки Кафки выполняют при включении:
- Исходная таблица KTable повторно использует исходный раздел в качестве раздела списка изменений.
- Когда это возможно, потоки Кафки сворачивают несколько тем перераспределения в одну тему перераспределения.
Главное, на что следует обратить внимание, поскольку я сам допустил эту ошибку, — не забудьте отправить конфигурацию как в сборку (), так и в построение KStreams (оптимизация, как указано здесь по предоставленной ссылке) выполняется в сборке.
// tell Kafka Streams to optimize the topology
config.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
// Since we've configured Streams to use optimizations, the topology is optimized during the build.
// And because optimizations are enabled, the resulting topology will no longer need to perform
// three explicit repartitioning steps, but only one.
final Topology topology = builder.build(config);
final KafkaStreams streams = new KafkaStreams(topology, config);
Теперь оптимизация включена для всей топологии, поэтому, если вы помните об этом, оптимизация № 2 также выполняется.