Как получить доступ к глобальной таблице вне контекста потоков кафки в spring boot?

#java #spring-boot #apache-kafka #apache-kafka-streams

Вопрос:

Я хотел бы получить доступ к глобальной таблице k в контексте, отличном от контекста потоков кафки.

В качестве примера у меня есть это примерное приложение:

 @SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}
 

с помощью этой настроенной глобальной таблицы

 @Component
public class Table {

    @Bean("myTable")
    public GlobalKTable<String, String> buildTopo(@Qualifier("ksConfig") StreamsBuilder builder) {
        return builder.globalTable("my-topic",
                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(
                                "my-state-store" /* table/store name */)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.String())
        );
    }
}
 

И эта конфигурация потоков кафки:

 @Configuration
public class KafkaConfiguration {

    @Bean("ksConfig")
    public StreamsBuilderFactoryBean kafkaStreams(KafkaProperties kafkaProperties,
                                                  @Value("${spring.application.name}") String appName) {
        var props = new HashMap<String, Object>(kafkaProperties.getProperties());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
        var stateDir = kafkaProperties.getStreams().getStateDir() != null ?
                kafkaProperties.getStreams().getStateDir() : System.getProperty("java.io.tmpdir");
        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
        var config = new KafkaStreamsConfiguration(props);
        return new StreamsBuilderFactoryBean(config);
    }
 

Теперь предположим, что у меня есть конечная точка в Интернете:

 @Path("/api")
@Service
public class Web {

    @GET
    @Path("/test")
    @Produces(MediaType.APPLICATION_JSON)
    public Response test(@QueryParam("key") String key) {

        String value =  // get value from global ktable with Key

        return Response.status(200).entity(value).build();
    }
}
 

Как я могу получить доступ к таблице k изнутри этого Web::test(...) метода?

Я попытался автоматически подключить некоторые предполагаемые классы потоков Кафки Web , такие как KafkaStreams и ProcessorContext . Весна выдает ошибку, в которой говорится, что она не может найти эти бобы.

Комментарии:

1. Похоже, вы хотите использовать InteractiveQueryService источник

2. @OneCricketeer — Я бы хотел избежать использования Spring Cloud, если это вообще возможно. Но это дает мне идеи о том, как добраться до ktable

3. Есть ли какая-то особая причина для этого? В противном случае, вы пытались @Autowired GlobalKTable<String, String> myTable; ? Так @Bean как это то, что вы создали?