Исключение RocksDB в потоках Kafka

#lambda #apache-kafka #kafka-consumer-api #apache-kafka-streams

#лямбда #apache-kafka #kafka-consumer-api #apache-kafka-streams

Вопрос:

В простой программе Kafka Stream, когда я использую приведенный ниже код, он работает без каких-либо ошибок:

       KTable<String, Long> result= source.mapValues(textLine
      ->textLine.toLowerCase()) .flatMapValues(lowercasedTextLine ->
      Arrays.asList(lowercasedTextLine.split(" "))) .selectKey((ignoredKey,word) ->
      word) .groupByKey() .count("Counts");

      result.to(Serdes.String(), Serdes.Long(), "wc-output");
  

Однако, когда я использую приведенный ниже код, я получаю ошибку:

     KStream<String, String> source = builder.stream("wc-input");
    source.groupBy((key, word) -> word).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(5000))).count()
            .toStream().map((key, value) -> new KeyValue<>(key.key(), value))
            .to("wc-output", Produced.with(Serdes.String(), Serdes.Long()));
  

Исключение в потоке
«потоки-количество слов-b160d715-f0e0-42ee-831e-0e4eed7e9424-StreamThread-1»
org.apache.kafka.streams.ошибки.StreamsException: исключение, пойманное в
процесс. Идентификатор задачи =1_0, процессор=KSTREAM-SOURCE-0000000006,
тема=потоки-количество слов-KSTREAM-ХРАНИЛИЩЕ АГРЕГИРОВАННОГО СОСТОЯНИЯ-0000000002-перераспределение,
раздел = 0, смещение = 0 в
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
в
org.apache.kafka.streams.processor.internals.Назначенные задачи.process(AssignedTasks.java:403)
в
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
в
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
в
org.apache.kafka.streams.processor.internals.StreamThread.RunOnce(StreamThread.java:822)
в
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
в
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Вызвано: org.apache.kafka.streams.ошибки.Исключение ProcessorStateException:
Ошибка при открытии хранилища
KSTREAM-ХРАНИЛИЩЕ АГРЕГИРОВАННОГО СОСТОЯНИЯ-0000000002:1553472000000 в местоположении
tmpkafka-streamsstreams-wordcount1_0KSTREAM-AGGREGATE-STATE-STORE-0000000002KSTREAM-AGGREGATE-STATE-STORE-0000000002:1553472000000
в
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204)
в
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174)
в
org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
в
org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Сегменты.java:89)
в
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
в
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
в
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
в
org.apache.kafka.streams.state.internals.Хранилище изменений windowbytesstore.put(хранилище изменений windowbytesstore.java:67)
в
org.apache.kafka.streams.state.internals.Хранилище изменений windows.put(хранилище изменений windows.java:33)
в
org.apache.kafka.streams.state.internals.CachingWindowStore $ 1.применить (CachingWindowStore.java:100)
в
org.apache.kafka.streams.state.internals.NamedCache.сброс (NamedCache.java:141)
в
org.apache.kafka.streams.state.internals.NamedCache.выселить(NamedCache.java:232)
в
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:245)
в
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153)
в
org.apache.kafka.streams.state.internals.Кэширование windowstore.put(кэширование windowstore.java:157)
в
org.apache.kafka.streams.state.internals.Кэширование windowstore.put(кэширование windowstore.java:36)
в
org.apache.kafka.streams.state.internals.Измеренный windowstore.put(измеренный windowstore.java:96)
в
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:122)
в
org.apache.kafka.streams.processor.internals.ProcessorNode$1.выполнить(ProcessorNode.java:46)
в
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
в
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
в
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
в
org.apache.kafka.streams.processor.internals.Исходный код.process(исходный код.java:80)
в
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
… Еще 6 вызвано: org.rocksdb.RocksDBException: Не удалось создать
реж:
H:tmpkafka-streamsstreams-wordcount1_0KSTREAM-AGGREGATE-STATE-STORE-0000000002KSTREAM-AGGREGATE-STATE-STORE-0000000002:1553472000000:
Недопустимый аргумент в org.rocksdb.RocksDB.open (собственный метод) в
org.rocksdb.RocksDB.open(RocksDB.java:231) в
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:197)

Комментарии:

1. пожалуйста, улучшите форматирование вашего кода. Это облегчит чтение и ответ на ваш вопрос.

2. похоже, что Kafka Streams не может создать каталог хранилища состояний. Вы могли бы попробовать изменить каталог state store на другой путь и посмотреть, решит ли это вашу проблему.

3. Какую версию потоков Kafka вы используете?

4. Это все еще происходит в Centos 7 с версией Kafka 2.1.0

Ответ №1:

При использовании оконной агрегации хранилища a называются по-другому, и в Kafka есть ошибка, 1.0.0 влияющая на ОС Windows: имя для оконных хранилищ содержит a, : которое не разрешено в ОС Windows. Ошибка исправлена в версии 1.0.1 и 1.1.0

Cf. https://issues.apache.org/jira/browse/KAFKA-6167