Поток Кафки: не удается получить данные из хранилища постоянных значений ключей Кафки

#spring #spring-boot #apache-kafka #apache-kafka-streams #spring-kafka

Вопрос:

Я использую потоки Кафки и постоянное хранилище значений ключей в своем приложении. Я использую два хранилища значений ключей и два процессора. Я столкнулся с проблемой с хранилищем состояний, которое совместно используется двумя процессорами. Обработчик имен помещает данные в хранилище имен, а обработчик событий извлекает данные из хранилища имен. Судя по отладке, похоже, что NameProcessor может успешно помещать данные, но когда EventProcessor пытается получить данные из хранилища имен, он не получает никаких данных. Ниже приведен фрагмент кода для класса приложения, Топологии, процессора имен и процессора событий. Кроме того, я использую родительскую версию Spring boot 2.4.3, кафка-потоки версии 2.2.0 и кафка-клиенты версии 2.2.0

         public static void main(String[] args) {

        SpringApplication.run(Application.class, args);

        Properties configs = getKafkaStreamProperties();

        Topology builder = new Topology();

        new ApplicationTopology(builder);

        KafkaStreams stream = new KafkaStreams(builder, configs);

        stream.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
            // here you should examine the throwable/exception and perform an appropriate action!
            logger.error("Uncaught exception in stream, MessageDetail: "  ExceptionUtils.getRootCauseMessage(throwable)   ", Stack Trace: "   throwable.fillInStackTrace());
            Runtime.getRuntime().halt(1);
        });

        Runtime.getRuntime().addShutdownHook(new Thread(stream::close));

        stream.start();
    }

    private static Properties getKafkaStreamProperties() {
        Properties configs = new Properties();
        configs.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, getApplicationId());
        configs.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
        configs.setProperty(StreamsConfig.RETRIES_CONFIG, getRetries());
        configs.setProperty(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackOffMs());
        configs.setProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG, getReplicationFactor());
        configs.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
        return configs;
    }

    public class ApplicationTopology {

    public ApplicationTopology (Topology builder) {

        StoreBuilder<KeyValueStore<String, Sensor>> nameStoreBuilder = Stores.
                keyValueStoreBuilder(Stores.persistentKeyValueStore("nameStore"), Serdes.String(), CustomSerdes.getNameSerde()).withCachingEnabled().withLoggingEnabled(new HashMap<>());

        StoreBuilder<KeyValueStore<String, String>> stateStoreBuilder = Stores.
                keyValueStoreBuilder(Stores.persistentKeyValueStore("stateStore"), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(new HashMap<>());


        builder.addSource(AutoOffsetReset.LATEST, "source", Serdes.String().deserializer(), CustomSerdes.getIncomingEventSerde().deserializer(), getInboundTopic())
                .addProcessor(TRANSFORMER, () -> new EventProcessor(), "source")
                .addStateStore(nameStoreBuilder, TRANSFORMER)
                .addSink("sink", getOutboundTopic(), Serdes.String().serializer(), CustomSerdes.getIncomingEventSerde().serializer(), TRANSFORMER);


        //reset to earliest for model config topic as some models could be already on the topic
        builder.addSource(AutoOffsetReset.EARLIEST, "nameStoreSource", Serdes.String().deserializer(), CustomSerdes.getSensorSerde().deserializer(), getInboundSensorUpdateTopic())
                .addProcessor("process", () -> new NameProcessor(), "nameStoreSource")
                .addStateStore(nameStoreBuilder,  TRANSFORMER, "process");

    }

    public ApplicationTopology() {}
    } }



    public class NameProcessor extends AbstractProcessor<String, Sensor> {

    private static final Logger LOGGER = LoggerFactory.getLogger(NameProcessor.class);

    ProcessorContext context;

    private KeyValueStore<String, Name> nameStore;

    private static List<String> externalDeviceIdList = new ArrayList<>();


    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        this.nameStore = (KeyValueStore<String, Name>) context.getStateStore("nameStore");
    }

    @Override
    public void process(String externalDeviceId, Name name) {

        if (StringUtils.isNotBlank(externalDeviceId)) {
            String[] externalDeviceIds = SensorUtils.getExternalDeviceIdsWithoutSuffix(externalDeviceId);

            if (Objects.isNull(name)) {
                Arrays.stream(externalDeviceIds).forEach(id -> {
                    sensorStore.delete(id);
                });
            } else {
                addOrUpdateNameInStore(sensor, externalDeviceIds);
            }
        }

    }

    private void addOrUpdateNameInStore(Sensor sensor, String[] externalDeviceIds) {

        Arrays.stream(externalDeviceIds).forEach(id -> {
            sensorStore.put(id, sensor);

        });

        // context.commit();
    }

}

    public class EventProcessor extends AbstractProcessor<String, IncomingEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);

    ProcessorContext context;

    private KeyValueStore<String, Name> nameStore;

    private KeyValueStore<String, String> stateStore;


    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;

        this.nameStore = (KeyValueStore<String, Name>) context.getStateStore("nameStore");
        this.stateStore = (KeyValueStore<String, String>) context.getStateStore("stateStore");
        
    }

    @Override
    public void process(String key, IncomingEvent value) {

        String correlationId = UUID.randomUUID().toString();

        try {

            String externalDeviceId = value.getExternalDeviceId();

           Name nameFromStore = nameStore.get(externalDeviceId);

}
}
}




In nameFromStore variable, I don't get even value even after storing it in NameProcessor.
 

Комментарии:

1. В итоге я использовал globalStateStore для выполнения своих требований. Это сработало, как и ожидалось.