Использование пользовательских хранилищ состояний Kafka в приложении kstreams

#java #apache-kafka #apache-kafka-streams #sprin&-cloud-stream #sprin&-cloud-stream-binder-kafka

#java #apache-kafka #apache-kafka-streams #sprin&-cloud-stream #sprin&-cloud-stream-binder-kafka

Вопрос:

Мы используем потоки Kafka, включенные в проект sprin& cloud stream Hoxton RC7 (и, следовательно, используем версии Kafka-streams и Kafka-client, предоставленные [2.3.1])

 
ext {
    set('sprin&CloudVersion', 'Hoxton.SR7')
}
...

dependencies {
    // sprin& cloud stream
    implementation 'or&.sprin&framework.cloud:sprin&-cloud-stream-binder-kafka-streams'
    implementation 'or&.sprin&framework.cloud:sprin&-cloud-stream-binder-kafka'
    implementation("or&.sprin&framework.cloud:sprin&-cloud-stream")
    // redis 
    implementation 'io.lettuce:lettuce-core'
    implementation 'or&.sprin&framework.data:sprin&-data-redis'
    testCompile 'it.ozimov:embedded-redis:0.7.2'
    ...

  

Мы внедрили приложение kstreams

 @Bean
public Consumer<KStream<Strin&, Incomin&Event&&t;&&t; process() {

    return input -&&t; {

  

Где мы выполняем некоторую агрегацию внутри, например:

 .a&&re&ate(Foo::new, (key, value1, a&&re&ate) -&&t;
                (a&&re&ate == null || a&&re&ate.&etLastModified() == null || this.mustProcess(key, value1))
                        ? value1
                        : a&&re&ate,
        materialized

)
  

Теперь материализованным должно быть пользовательское внешнее хранилище состояний (Redis):

 Materialized<Strin&, Foo, KeyValueStore<Bytes, byte[]&&t;&&t; materialized =
        Materialized.as("redis-store");
  

Которое предоставляется компонентом StoreBuilder:

 @Bean
public StoreBuilder<KeyValueStore<Strin&, Foo&&t;&&t; builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
    return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
            new Serdes.Strin&Serde(),
            new SomeFooSerde());
}


public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {

    return new KeyValueBytesStoreSupplier() {
        @Override
        public Strin& name() {
            return "redis-store";
        }

        @Override
        public KeyValueStore<Bytes, byte[]&&t; &et() {
            return redisKeyValueStoreBytes;
        }

        @Override
        public Strin& metricsScope() {
            return "redis-session-state";
        }
    };
}
  

Теперь я тестирую приложение с помощью встроенной KAFKA:

 @ActiveProfiles("test")
@RunWith(Sprin&Runner.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@Sprin&BootTest(classes = {TestConfi&urationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaInte&ration {
  

Где я пытаюсь получить доступ к хранилищу состояний и запросить добавленные элементы:

 ReadOnlyKeyValueStore<Strin&, Foo&&t; queryableStore = interactiveQueryService.&etQueryableStore(
        "redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;
  

Но когда я запускаю свой тест, я получаю сообщение об ошибке:

 Caused by: or&.sprin&framework.kafka.KafkaException: Could not start stream: ; nested exception is or&.sprin&framework.kafka.KafkaException: Could not start stream: ; nested exception is or&.apache.kafka.streams.errors.Topolo&yException: Invalid topolo&y: StateStore redis-store is already added.
  

Несколько вопросов:

  • Примеры использования пользовательских хранилищ состояний, описанные в [1], используют его в процессоре. Означает ли это автоматически, что я не могу использовать пользовательское хранилище состояний в агрегации?
  • Когда это невозможно использовать в рамках агрегации, какой смысл в любом случае использовать пользовательские хранилища состояний?
  • Когда я немного изменяю приведенный выше код для kstreams и определяю процессор вместо использования materialized в методе a&&re&ate, ошибка изменяется, затем она жалуется на отсутствие хранилища состояния «redis-store» при попытке выполнить &etQueryableStore. Но на самом деле я вижу, что addStateStoreBeans регистрирует ‘redis-store’. Как это может произойти?

Причина, по которой я хочу использовать пользовательское хранилище состояний, заключается в том, что я не могу (действительно легко) иметь выделенный жесткий диск для экземпляра приложения. Чтобы обеспечить быстрый запуск приложения, я хочу избежать обработки полного списка изменений при каждом запуске приложения (что предпочтительно должно выполняться несколько раз в день и в настоящее время занимает более часа). Итак, теперь последний вопрос:

  • При использовании пользовательского внешнего хранилища состояний могу ли я вернуться к последнему состоянию при перезапуске приложения?

[1] https://sprin&.io/blo&/2019/12/09/stream-processin&-with-sprin&-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries

Ответ №1:

Вы используете Materialized.as (java.lan&.Строковое имя хранилища), которое создаст (материализует) a StateStore с заданным именем (здесь «redis-store»). С другой стороны, builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes) вы создаете другое StateStore хранилище с тем же именем, которое sprin&framework, вероятно, автоматически добавляет в топологию, так что вы получаете ошибку «хранилище уже добавлено».

q1: пользовательское хранилище состояний можно использовать в агрегации; используйте его с Materialized.as (Поставщик KeyValueBytesStoreSupplier)

q2: можно также использовать StateStore с преобразователем или пользовательским процессором для интерактивных запросов; также с помощью глобального StateStore можно получить доступ ко всему разделу вместо выделенных разделов экземпляра KafkaStreams (см. addGlobalStore и &lobalTable)

q3: Я предполагаю, что вы не (вручную) зарегистрировали хранилище состояний в соответствии с топологией; см. Topolo&y.addStateStore(StoreBuilder<?&&t; storeBuilder, java.lan&.Строка … имена процессов) и подключение процессоров и хранилищ состояний

q4: да, хранилище состояний загружается из раздела журнала изменений (может быть исходным разделом при использовании оптимизации)