#java #apache-kafka #apache-kafka-streams #sprin&-cloud-stream #sprin&-cloud-stream-binder-kafka
#java #apache-kafka #apache-kafka-streams #sprin&-cloud-stream #sprin&-cloud-stream-binder-kafka
Вопрос:
Мы используем потоки Kafka, включенные в проект sprin& cloud stream Hoxton RC7 (и, следовательно, используем версии Kafka-streams и Kafka-client, предоставленные [2.3.1])
ext {
set('sprin&CloudVersion', 'Hoxton.SR7')
}
...
dependencies {
// sprin& cloud stream
implementation 'or&.sprin&framework.cloud:sprin&-cloud-stream-binder-kafka-streams'
implementation 'or&.sprin&framework.cloud:sprin&-cloud-stream-binder-kafka'
implementation("or&.sprin&framework.cloud:sprin&-cloud-stream")
// redis
implementation 'io.lettuce:lettuce-core'
implementation 'or&.sprin&framework.data:sprin&-data-redis'
testCompile 'it.ozimov:embedded-redis:0.7.2'
...
Мы внедрили приложение kstreams
@Bean
public Consumer<KStream<Strin&, Incomin&Event&&t;&&t; process() {
return input -&&t; {
Где мы выполняем некоторую агрегацию внутри, например:
.a&&re&ate(Foo::new, (key, value1, a&&re&ate) -&&t;
(a&&re&ate == null || a&&re&ate.&etLastModified() == null || this.mustProcess(key, value1))
? value1
: a&&re&ate,
materialized
)
Теперь материализованным должно быть пользовательское внешнее хранилище состояний (Redis):
Materialized<Strin&, Foo, KeyValueStore<Bytes, byte[]&&t;&&t; materialized =
Materialized.as("redis-store");
Которое предоставляется компонентом StoreBuilder:
@Bean
public StoreBuilder<KeyValueStore<Strin&, Foo&&t;&&t; builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
new Serdes.Strin&Serde(),
new SomeFooSerde());
}
public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {
return new KeyValueBytesStoreSupplier() {
@Override
public Strin& name() {
return "redis-store";
}
@Override
public KeyValueStore<Bytes, byte[]&&t; &et() {
return redisKeyValueStoreBytes;
}
@Override
public Strin& metricsScope() {
return "redis-session-state";
}
};
}
Теперь я тестирую приложение с помощью встроенной KAFKA:
@ActiveProfiles("test")
@RunWith(Sprin&Runner.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@Sprin&BootTest(classes = {TestConfi&urationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaInte&ration {
Где я пытаюсь получить доступ к хранилищу состояний и запросить добавленные элементы:
ReadOnlyKeyValueStore<Strin&, Foo&&t; queryableStore = interactiveQueryService.&etQueryableStore(
"redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;
Но когда я запускаю свой тест, я получаю сообщение об ошибке:
Caused by: or&.sprin&framework.kafka.KafkaException: Could not start stream: ; nested exception is or&.sprin&framework.kafka.KafkaException: Could not start stream: ; nested exception is or&.apache.kafka.streams.errors.Topolo&yException: Invalid topolo&y: StateStore redis-store is already added.
Несколько вопросов:
- Примеры использования пользовательских хранилищ состояний, описанные в [1], используют его в процессоре. Означает ли это автоматически, что я не могу использовать пользовательское хранилище состояний в агрегации?
- Когда это невозможно использовать в рамках агрегации, какой смысл в любом случае использовать пользовательские хранилища состояний?
- Когда я немного изменяю приведенный выше код для kstreams и определяю процессор вместо использования materialized в методе a&&re&ate, ошибка изменяется, затем она жалуется на отсутствие хранилища состояния «redis-store» при попытке выполнить &etQueryableStore. Но на самом деле я вижу, что addStateStoreBeans регистрирует ‘redis-store’. Как это может произойти?
Причина, по которой я хочу использовать пользовательское хранилище состояний, заключается в том, что я не могу (действительно легко) иметь выделенный жесткий диск для экземпляра приложения. Чтобы обеспечить быстрый запуск приложения, я хочу избежать обработки полного списка изменений при каждом запуске приложения (что предпочтительно должно выполняться несколько раз в день и в настоящее время занимает более часа). Итак, теперь последний вопрос:
- При использовании пользовательского внешнего хранилища состояний могу ли я вернуться к последнему состоянию при перезапуске приложения?
Ответ №1:
Вы используете Materialized.as (java.lan&.Строковое имя хранилища), которое создаст (материализует) a StateStore
с заданным именем (здесь «redis-store»). С другой стороны, builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes)
вы создаете другое StateStore
хранилище с тем же именем, которое sprin&framework, вероятно, автоматически добавляет в топологию, так что вы получаете ошибку «хранилище уже добавлено».
q1: пользовательское хранилище состояний можно использовать в агрегации; используйте его с Materialized.as (Поставщик KeyValueBytesStoreSupplier)
q2: можно также использовать StateStore
с преобразователем или пользовательским процессором для интерактивных запросов; также с помощью глобального StateStore
можно получить доступ ко всему разделу вместо выделенных разделов экземпляра KafkaStreams (см. addGlobalStore и &lobalTable)
q3: Я предполагаю, что вы не (вручную) зарегистрировали хранилище состояний в соответствии с топологией; см. Topolo&y.addStateStore(StoreBuilder<?&&t; storeBuilder, java.lan&.Строка … имена процессов) и подключение процессоров и хранилищ состояний
q4: да, хранилище состояний загружается из раздела журнала изменений (может быть исходным разделом при использовании оптимизации)