#spring-kafka #spring-cloud-stream #spring-cloud-stream-binder-kafka #spring-messaging #spring-cloud-function
#весна-кафка #весна-облако-поток #spring-cloud-stream-binder-kafka #весенние сообщения #функция весеннего облака
Вопрос:
Я использую StreamBridge для отправки сообщений на тему Кафки с контроллера. Все работало нормально, пока я не начал использовать CompletableFuture, чтобы заставить операцию выполняться асинхронно.
У меня есть простой контроллер rest и метод синхронизации OpenProject, который вызывает службу, которая в конце отправляет сообщение на Кафку с помощью StreamBridge, и другой метод контроллера, который обертывает метод синхронизации в CompletableFuture, такой как CompletableFuture.supplyAsync(()=gt;{OpenProject ();})
При использовании обернутого HTTP-вызова я получаю исключение при публикации сообщения на Kafka, исключение типа «Класс org.springframework.kafka.support.сериализатор.Не удалось найти StringOrBytesSerializer».
Ошибка всегда возникает при каждом повторном запросе HTTP (sych или asynch), если первый HTTP-запрос после перезапуска контейнера относится к методу aync, но никогда не происходит, если первый HTTP-запрос после перезапуска контейнера относится к синхронному.
Возможно, я делаю что-то не так, но я не знаю, что именно.
Вот трассировка стека
Спасибо.
2021-12-03 07:13:13.158 ERROR [NDE-PROJECT-MANAGEMENT-SERVICE,,] 7 --- [nPool-worker-19] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information org.apache.kafka.common.config.ConfigException: Invalid value org.springframework.kafka.support.serializer.StringOrBytesSerializer for configuration key.serializer: Class org.springframework.kafka.support.serializer.StringOrBytesSerializer could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:729) ~[kafka-clients-2.7.1.jar!/:na] at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:475) ~[kafka-clients-2.7.1.jar!/:na] at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) ~[kafka-clients-2.7.1.jar!/:na] at org.apache.kafka.common.config.AbstractConfig.lt;initgt;(AbstractConfig.java:108) ~[kafka-clients-2.7.1.jar!/:na] at org.apache.kafka.common.config.AbstractConfig.lt;initgt;(AbstractConfig.java:129) ~[kafka-clients-2.7.1.jar!/:na] at org.apache.kafka.clients.producer.ProducerConfig.lt;initgt;(ProducerConfig.java:536) ~[kafka-clients-2.7.1.jar!/:na] at org.apache.kafka.clients.producer.KafkaProducer.lt;initgt;(KafkaProducer.java:330) ~[kafka-clients-2.7.1.jar!/:na] at org.apache.kafka.clients.producer.KafkaProducer.lt;initgt;(KafkaProducer.java:291) ~[kafka-clients-2.7.1.jar!/:na] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:743) ~[spring-kafka-2.7.8.jar!/:2.7.8] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:584) ~[spring-kafka-2.7.8.jar!/:2.7.8] at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:544) ~[spring-kafka-2.7.8.jar!/:2.7.8] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:519) ~[spring-kafka-2.7.8.jar!/:2.7.8] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:513) ~[spring-kafka-2.7.8.jar!/:2.7.8] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$createProducerMessageHandler$0(KafkaMessageChannelBinder.java:396) ~[spring-cloud-stream-binder-kafka-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$6(KafkaTopicProvisioner.java:535) ~[spring-cloud-stream-binder-kafka-core-3.1.4.jar!/:3.1.4] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.1.jar!/:na] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209) ~[spring-retry-1.3.1.jar!/:na] at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:530) ~[spring-cloud-stream-binder-kafka-core-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:394) ~[spring-cloud-stream-binder-kafka-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:158) ~[spring-cloud-stream-binder-kafka-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:226) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:91) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:320) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:285) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:256) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:202) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:156) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:136) ~[spring-cloud-stream-3.1.4.jar!/:3.1.4] at com.example.test.projectmanagement.services.ProjectConfigurationInformationProducer.sendProjectUpdate(ProjectConfigurationInformationProducer.java:35) ~[classes!/:na] at com.example.test.projectmanagement.services.ProjectManagementService.close(ProjectManagementService.java:191) ~[classes!/:na] at com.example.test.projectmanagement.services.ProjectManagementService$FastClassBySpringCGLIB$798d48c1.invoke(lt;generatedgt;) ~[classes!/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.12.jar!/:5.3.12] at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:783) ~[spring-aop-5.3.12.jar!/:5.3.12] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.12.jar!/:5.3.12] at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753) ~[spring-aop-5.3.12.jar!/:5.3.12] at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97) ~[spring-aop-5.3.12.jar!/:5.3.12] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.12.jar!/:5.3.12] at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753) ~[spring-aop-5.3.12.jar!/:5.3.12] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:698) ~[spring-aop-5.3.12.jar!/:5.3.12] at com.example.test.projectmanagement.services.ProjectManagementService$EnhancerBySpringCGLIB$52d5281d.close(lt;generatedgt;) ~[classes!/:na] at com.example.test.projectmanagement.controllers.ProjectManagementController.closeProject(ProjectManagementController.java:283) ~[classes!/:na] at com.example.test.projectmanagement.controllers.ProjectManagementController.lambda$closeProjectAsync$6(ProjectManagementController.java:303) ~[classes!/:na] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[na:na] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) ~[na:na] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[na:na] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[na:na] at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[na:na] at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[na:na] at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[na:na]