Создание кэша с потоками Kafka

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

Я пытаюсь понять, что возможно и как думать при работе Kafka Streams .

Пример использования:

Существует тема под названием Transactions :

  • key -> transactionReference (строка)
  • значение -> временная метка, утверждено / отменено (строка JSON)

Я хочу создать кеш, в котором будут храниться все последние транзакции (последние 10 минут).

Клиент rest может запрашивать кэш, предоставляя ссылку на транзакцию.

Вопросы:

  1. Подходит ли Kafka streams (вместе с его материализованными представлениями) для реализации такого кэша?
  2. Если да, как бы вы это сделали? Помните, что необходимо сохранять только последние 10 минут транзакций и отбрасывать старые.
  3. Если нет, то почему бы и нет?

Ответ №1:

Да, это очень хорошая идея для его разработки kafka-streams . Как это сделать?

  1. Сначала создайте класс, который представляет значения кэша:
 class Transaction {
 Instant createTime;
 Status status;
 String transactionReference;
}
 
  1. Во-вторых, создайте класс, который обрабатывает логику кэша — реализует org.apache.kafka.streams.kstream.Transformer<K, V, R> :
 public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {

    private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);

    private KeyValueStore<String, Transaction> transactions;

    @Override
    public void init(ProcessorContext context) {
        this.transactions = context.getStateStore("transactions-store");
        context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
            timestamp -> transactions.all()
                .forEachRemaining(kV -> {
                    if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
                        transactions.delete(kV.key);
                    }
                }));
    }

    private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
        return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
    }

    @Override
    public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
        Transaction t = this.transactions.get(transaction.getTransactionReference());
        if (t == null) {
            transactions.put(transaction.getTransactionReference(), transaction);
        }
        return null;
    }

    @Override
    public void close() {

    }
}
 
  1. Затем зарегистрируйте transformer в topology:
     static StreamsBuilder buildKafkaStreamsTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
            .keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
        builder.addStateStore(transferProcessKeyValueStore);

        builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
            .transform(TransactionsCache::new, "transactions-store");

        return builder;
    }
 
  1. Следующий шаг — считывание данных в http-контроллере:
 @RestController
public class TransactionsController {

    private final KafkaStreams kafkaStreams;

    public TransactionsController(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }

    @GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
    Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
        ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
            StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));

        return store.get(transactionReference);
    }
}

 
  1. Последнее. Помните, что этот кэш в памяти по умолчанию разделен на разделы, поэтому в случае запуска многих экземпляров вашего приложения вам нужно добавить некоторый метод RPC для получения данных из другого экземпляра в случае промаха (интерактивные запросы Kafka), здесь у вас есть очень аккуратный пример. Или второе решение заключается в использовании org.apache.kafka.streams.kstream.GlobalKTable<K, V>

Комментарии:

1. Логика уровня rest может быть выполнена очень просто с помощью azkarrastreams.io

2. Выглядит многообещающе, я изучу это, txh!