#java #spring-boot #apache-kafka #apache-kafka-streams
Вопрос:
Я хотел бы получить доступ к глобальной таблице k в контексте, отличном от контекста потоков кафки.
В качестве примера у меня есть это примерное приложение:
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
с помощью этой настроенной глобальной таблицы
@Component
public class Table {
@Bean("myTable")
public GlobalKTable<String, String> buildTopo(@Qualifier("ksConfig") StreamsBuilder builder) {
return builder.globalTable("my-topic",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(
"my-state-store" /* table/store name */)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
);
}
}
И эта конфигурация потоков кафки:
@Configuration
public class KafkaConfiguration {
@Bean("ksConfig")
public StreamsBuilderFactoryBean kafkaStreams(KafkaProperties kafkaProperties,
@Value("${spring.application.name}") String appName) {
var props = new HashMap<String, Object>(kafkaProperties.getProperties());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
var stateDir = kafkaProperties.getStreams().getStateDir() != null ?
kafkaProperties.getStreams().getStateDir() : System.getProperty("java.io.tmpdir");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
var config = new KafkaStreamsConfiguration(props);
return new StreamsBuilderFactoryBean(config);
}
Теперь предположим, что у меня есть конечная точка в Интернете:
@Path("/api")
@Service
public class Web {
@GET
@Path("/test")
@Produces(MediaType.APPLICATION_JSON)
public Response test(@QueryParam("key") String key) {
String value = // get value from global ktable with Key
return Response.status(200).entity(value).build();
}
}
Как я могу получить доступ к таблице k изнутри этого Web::test(...)
метода?
Я попытался автоматически подключить некоторые предполагаемые классы потоков Кафки Web
, такие как KafkaStreams
и ProcessorContext
. Весна выдает ошибку, в которой говорится, что она не может найти эти бобы.
Комментарии:
1. Похоже, вы хотите использовать
InteractiveQueryService
источник2. @OneCricketeer — Я бы хотел избежать использования Spring Cloud, если это вообще возможно. Но это дает мне идеи о том, как добраться до ktable
3. Есть ли какая-то особая причина для этого? В противном случае, вы пытались
@Autowired GlobalKTable<String, String> myTable;
? Так@Bean
как это то, что вы создали?