Должен ли AbstractListenerContainerFactory действительно закрывать потребителя, которого он использует для проверки тем

#spring-kafka

#spring-kafka

Вопрос:

В org.springframework.kafka.listener.AbstractMessageListenerContainer, при запуске контейнера метод checkTopics проверяет, существует ли подписанная тема на брокере, используя потребителя Kafka, который создается в блоке try with resources .

Когда потребитель закрыт, закрытие каскадируется до многих закрываемых связанных объектов, в частности десериализаторов ключей и значений (см. org.apache .kafka.clients.consumer.KafkaConsumer). В приложении Spring десериализаторы обычно объявляются как компоненты, поэтому на фабрике каждого типа есть только один экземпляр, и хотя большинство десериализаторов реализуют close как no-op, я сталкивался со случаями, когда закрытие десериализатора делает его непригодным для использования с этого момента.

Мне кажется, что, хотя закрытие потребителя звучит разумно, учитывая, что Spring запускает несколько экземпляров, а созданный здесь — просто одноразовый, каскад до компонентов десериализатора является нежелательным последствием, которое, возможно, не было замечено при написании AbstractMessageListenerContainer .

Существует обходной путь — при создании KafkaListenerContainerFactory просто вызовите

 factory.getContainerProperties().setMissingTopicsFatal(false);
  

но это удаляет проверку безопасности для существования темы и кажется чем-то вроде взлома. Действительно ли закрытие потребителя правильно в AbstractMessageListenerContainer?

Ответ №1:

Мы можем и будем использовать AdminClient вместо a, Consumer чтобы проверить, существуют ли темы.

Однако это не панацея, поскольку stop() ping контейнера также закрывает Consumer (ы), поэтому та же проблема будет существовать при перезапуске контейнера (ов).

В подобных случаях лучше позволить Kafka управлять жизненным циклом десериализатора вместо объявления его как компонента.

Комментарии:

1. Это имеет смысл — я должен дважды проверить исходный код — предположительно, если десериализатор просто указан в properties / yml, тогда экземпляры создаются соответствующим образом при создании новых потребителей Kafka — это когда нет конструктора без аргументов или требуется какая-либо другая инициализация, зависящая от компонента, которую я объявилих как бобы и установить их в контейнере.

2. Однако, глядя на DefaultConsumerFactory, это разработано явно с отдельными экземплярами десериализаторов ключей и значений, которые являются общими для всех созданных экземпляров потребителя. Было бы лучше для этого класса использовать DeserializerFactories, которые предоставляют новые экземпляры для каждого потребителя, чтобы любые события жизненного цикла, такие как закрытие, можно было безопасно каскадировать вниз?

3. Да, мы можем сделать что-то подобное; пожалуйста, откройте проблему на GitHub ; и вклад приветствуется ! .

4. Спасибо — сделаю и хотел бы внести свой вклад — если бы какой-либо мой код попал в исходный код Spring, я был бы очень счастлив 🙂

5. Круто; Я имею в виду SerdeFactory интерфейс с шестью методами — сериализатор / десериализатор ключа / значения и ключи / значения для потоков Kafka (все методы, не имеющие настроек по умолчанию). С одной реализацией, которая получает (до) 6 имен компонентов и ApplicationContext использует getBean() каждый раз, поэтому компоненты с областью прототипа будут предоставлять новый для каждого производителя / потребителя / потока. Мы можем обсудить это дальше, когда вы откроете проблему.