#apache-kafka #apache-kafka-streams
#apache-kafka #apache-kafka-streams
Вопрос:
Я пытаюсь понять, что возможно и как думать при работе Kafka Streams
.
Пример использования:
Существует тема под названием Transactions
:
- key -> transactionReference (строка)
- значение -> временная метка, утверждено / отменено (строка JSON)
Я хочу создать кеш, в котором будут храниться все последние транзакции (последние 10 минут).
Клиент rest может запрашивать кэш, предоставляя ссылку на транзакцию.
Вопросы:
- Подходит ли Kafka streams (вместе с его материализованными представлениями) для реализации такого кэша?
- Если да, как бы вы это сделали? Помните, что необходимо сохранять только последние 10 минут транзакций и отбрасывать старые.
- Если нет, то почему бы и нет?
Ответ №1:
Да, это очень хорошая идея для его разработки kafka-streams
. Как это сделать?
- Сначала создайте класс, который представляет значения кэша:
class Transaction {
Instant createTime;
Status status;
String transactionReference;
}
- Во-вторых, создайте класс, который обрабатывает логику кэша — реализует
org.apache.kafka.streams.kstream.Transformer<K, V, R>
:
public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {
private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);
private KeyValueStore<String, Transaction> transactions;
@Override
public void init(ProcessorContext context) {
this.transactions = context.getStateStore("transactions-store");
context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
timestamp -> transactions.all()
.forEachRemaining(kV -> {
if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
transactions.delete(kV.key);
}
}));
}
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
}
@Override
public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
Transaction t = this.transactions.get(transaction.getTransactionReference());
if (t == null) {
transactions.put(transaction.getTransactionReference(), transaction);
}
return null;
}
@Override
public void close() {
}
}
- Затем зарегистрируйте transformer в topology:
static StreamsBuilder buildKafkaStreamsTopology() {
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
builder.addStateStore(transferProcessKeyValueStore);
builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
.transform(TransactionsCache::new, "transactions-store");
return builder;
}
- Следующий шаг — считывание данных в http-контроллере:
@RestController
public class TransactionsController {
private final KafkaStreams kafkaStreams;
public TransactionsController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));
return store.get(transactionReference);
}
}
- Последнее. Помните, что этот кэш в памяти по умолчанию разделен на разделы, поэтому в случае запуска многих экземпляров вашего приложения вам нужно добавить некоторый метод RPC для получения данных из другого экземпляра в случае промаха (интерактивные запросы Kafka), здесь у вас есть очень аккуратный пример. Или второе решение заключается в использовании
org.apache.kafka.streams.kstream.GlobalKTable<K, V>
Комментарии:
1. Логика уровня rest может быть выполнена очень просто с помощью azkarrastreams.io
2. Выглядит многообещающе, я изучу это, txh!