Потребитель Reactor Kafka продолжает получать сообщения после отзыва разделов

#apache-kafka #apache-kafka-streams #spring-kafka #project-reactor

#apache-kafka #apache-kafka-streams #spring-kafka #проект-реактор

Вопрос:

Когда узел, на котором я работаю io.projectreactor.kafka:reactor-kafka:1.1.0.RELEASE , перегружается, иногда случается, что потребитель теряет разделы. Я предполагаю, что ему не удается отправлять сердцебиения. Это не та проблема, с которой мне нужна помощь.

 Oct 14 00:14:53 pjnsj [r-coordinator-2].c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId1] Lost previously assigned partitions topic-1-0, topic-2-0, topic-3-0, topic-4-0, topic-5-0
Oct 14 00:14:53 pjnsj [r-coordinator-2].c.MyClass                        : onPartitionsRevoked: [topic-1-0, topic-2-0, topic-3-0, topic-4-0, topic-5-0]
Oct 14 00:14:53 pjnsj [r-coordinator-2].c.MyClass                        : Cleaning up some state A <--- [1]...Some state is cleared when the partition is revoked
Oct 14 00:14:53 pjnsj [r-coordinator-2].coordinator.CoordinatorService   : Successfully revoked partitions:[topic-1-0, topic-2-0, topic-3-0, topic-4-0, topic-5-0]
Oct 14 00:14:53 pjnsj [r-coordinator-2].c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId1] (Re-)joining group
Oct 14 00:14:53 pjnsj [r-coordinator-2].c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId1] (Re-)joining group
Oct 14 00:14:53 pjnsj [r-coordinator-2].c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId1] Member consumer-consumer-2-4eb23c30-ad7c-4453-8336-3d9886895c90 sending LeaveGroup request to coordinator cp-kafka-4.cp-kafka-headless.cp-kafka:9092 (id: 2147483643 rack: null) due to the consumer is being closed

Oct 14 00:15:08 pjnsj ERROR 1 --- [     parallel-2] .c.MyClass   : Ups ops, error consuming messages from Kafka!
Oct 14 00:15:08 pjnsj  
java.lang.NullPointerException: null
    at .c.MyClass(MyClass.java:249)  <--- [2]...flux filter that relies on the state cleaned up earlier
    at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onNext(FluxFilter.java:226)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851)
    at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:267)
    at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:225)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2152)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2026)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8325)
  

My RevokeListener очищает некоторое состояние, которое я сохраняю при использовании [1]. Cleaning up some state A в журнале.

Судя по журналам, некоторые сообщения используются после RevokeListener вызова is . Исключение NullPointerException происходит в части кода, обрабатывающего сообщения.

Возможно ли это?