Как добавить пользовательское хранилище состояний в DSL-процессор Kafka Streams?

#apache-kafka-streams

#apache-kafka-streams

Вопрос:

Для одного из моих приложений Kafka streams мне нужно использовать функции как DSL, так и Processor API. Мой поток потокового приложения

 source -> selectKey -> filter -> aggregate (on a window) -> sink
  

После агрегации мне нужно отправить ОДНО агрегированное сообщение в приемник. Итак, я определяю свою топологию следующим образом

 KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
      .filterNot((k,v) -> k.equals("UnknownGroup"))
      .process(() -> new MyProcessor());
  

Я определяю пользовательский StateStore и регистрирую его в своем процессоре, как показано ниже

 public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context = null;
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
        .withKeys(Serdes.String())
        .withValues(invSerde)
        .persistent()
        .build()
        .get();

    public MyProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.register(invStore, false, null); // register the store
        this.context.schedule(10 * 60 * 1000L);
    }

    @Override
    public void process(String partitionKey, String message) {
        try {
            MessageModel smb = new MessageModel(message);
            HashMapStore oldStore = invStore.get(partitionKey);
            if (oldStore == null) {
                oldStore = new HashMapStore();
            }
            oldStore.addSmb(smb);
            invStore.put(partitionKey, oldStore);
        } catch (Exception e) {
           e.printStackTrace();
        }
    }

    @Override
    public void punctuate(long timestamp) {
       // processes all the messages in the state store and sends single aggregate message
    }


    @Override
    public void close() {
        invStore.close();
    }
}
  

Когда я запускаю приложение, я получаю java.lang.NullPointerException

Исключение в потоке «StreamThread-18» java.lang.Исключение NullPointerException в org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) в org.apache.kafka.streams.processor.internals.ProcessorStateManager.сбросить (ProcessorStateManager.java:332) в org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) в org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) в org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) в org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) в org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

Есть идеи, что здесь происходит не так?

Ответ №1:

Вам необходимо зарегистрировать хранилище, которое вы используете вне вашего процессора, используя StreamsBuilder (или KStreamBuilder в более старых версиях). Сначала вы создаете хранилище, затем регистрируете его в StreamsBuilder ( KStreamBuilder ), и когда вы добавляете процессор, вы указываете имя хранилища для соединения процессора и хранилища.

 StreamsBuilder builder = new StreamsBuilder();

// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("invStore"),
    Serdes.String(),
    invSerde));
// register store
builder.addStateStore(storeBuilder);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name


// older API:

KStreamBuilder builder = new KStreamBuilder();

// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
    .withKeys(Serdes.String())
    .withValues(invSerde)
    .persistent()
    .build();
// register store
builder.addStateStore(storeSupplier);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
  

Комментарии:

1. Это работает. большое спасибо. Просто любопытно узнать, где я могу использовать метод ProcessorContext::register?

2. Вам нужно, только ProcessorContext#register если вы реализуете свое собственное хранилище состояний (используя интерфейс StateStore ). Вам нужно было бы вызвать register в StateStore#init()

3. @MatthiasJ.Sax Если я реализовал свое собственное хранилище состояний и если я не вызываю register в StateStore#init(), то будет ли processorContext.getStateStore(«mystore») работать в методе AbstractProcessor init() или он вернет null? Заранее спасибо.

4. Вам нужно будет зарегистрировать хранилище. В противном случае оно не может быть найдено и getStateStore() возвращается null .