Как сохранить соединение redis открытым при чтении из реактивного API

#redis #reactive-programming #spring-data-redis #lettuce

#redis #реактивное программирование #spring-data-redis #салат

Вопрос:

Я постоянно прослушиваю потоки redis, используя spring reactive api (используя драйвер lettuce). Я использую автономное соединение. Похоже, что цикл событий реактора открывает новое соединение каждый раз, когда он читает сообщения, вместо того, чтобы сохранять соединение открытым. Я вижу много портов TIME_WAIT на своем компьютере, когда запускаю свою программу. Это нормально? Есть ли способ сообщить lettuce о необходимости повторного использования соединения вместо повторного подключения каждый раз?

Это мой код:

 StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory);
return receiver
    .receive(Consumer.from(keyCacheStreamsConfig.getConsumerGroup(), keyCacheStreamsConfig.getConsumer()),
        StreamOffset.create(keyCacheStreamsConfig.getStreamName(), ReadOffset.lastConsumed()))//
    // flatMap reads 256 messages by default and processes them in the given scheduler
    .flatMap(record -> Mono.fromCallable(() -> consumer.consume(record)).subscribeOn(Schedulers.boundedElastic()))//
    .doOnError(t -> {
      log.error("Error processing.", t);
      streamConnections.get(nodeName).setDirty(true);
    })//
    .onErrorContinue((err, elem) -> log.error("Error processing message. Continue listening."))//
    .subscribe();
 

Ответ №1:

Похоже, что библиотека spring-data-redis повторно использует соединение, только если тайм-аут опроса установлен на «0» в параметрах приемника потока и передает его в качестве второго аргумента StreamReceiver.create(factory, options) . Понял, заглянув в исходный код spring-data-redis.