#spring #apache-kafka #apache-kafka-streams #spring-cloud-stream
Вопрос:
Я последовал этому примеру, чтобы создать аутентификацию в моем брокере кафке, однако для моего проекта есть два отличия
- Я использую связующее средство потоков кафки, в то время как в примере используется связующее средство кафки.
- У меня есть только один брокер, в то время как в примере используются два брокера.
Когда я запускаю свое приложение, оно возвращает следующую ошибку:
https://gist.github.com/JacsonF/7363e8d7f4f07a5bc77c28fb1e882674
Моя функция:
@Bean
public Function<KStream<Object, String>, KStream<?, String>[]> receive() {
Predicate<Object, String> isCompensado = (k, v) -> v.contains("XPTO");
Predicate<Object, String> isntCompensado = (k, v) -> !v.contains("XPTO");
return input -> input.branch(isCompensado, isntCompensado);
}
Я хотел бы использовать только свойства моего приложения для аутентификации, без внешнего файла, как в примере.
Как я могу это сделать?
Комментарии:
1. Можете ли вы дать больше из своего стека? Нам нужно увидеть причину из трассировки, чтобы понять, в чем основная причина проблем, с которыми вы сталкиваетесь.
2. спасибо за ответ, я обновил и загрузил полный журнал, чтобы он понял
3. Вы уверены, что у вас есть правильная настройка
KafkaClient
в вашем файле конфигурации? Смотрите этот файл для ознакомления с тем, как настроить среду Kakfa: github.com/spring-cloud/spring-cloud-stream-samples/blob/main/…4. В частности, вам нужна
Client
информация в вашем файле конфигурации.5. @sobychacko Я создаю эту реализацию, я не знаю, лучший ли подход, если вы проверите и ответите мне, это было бы здорово. Возможно, я не был ясен, сообщая о проблеме, я попытался лучше объяснить ее в представленном мной проекте
Ответ №1:
Возникла проблема с тем, как конфигурация JAAS обрабатывалась в связывателе потоков Кафки для Spring Cloud Stream. Более подробную информацию смотрите в этом выпуске. Теперь это исправлено. Убедитесь, что вы используете версию 3.1.4-SNAPSHOT
. Вот пример приложения, которое подтверждает это.
Обновить
Были выпущены привязки Кафки для Spring Cloud Stream, исправлена версия 3.1.4.
Если вы используете spring-cloud-dependencies
в своем pom для управления зависимостями, обновите его, например, до (по крайней мере) версии 2020.0.4:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2020.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Комментарии:
1. Это здорово! для меня это прекрасно работает, большое вам спасибо.
2. @JacsonF Если вы должны принять этот ответ, новая версия SpCS Kafka Binder действительно устраняет проблему.
Ответ №2:
Это прекрасно работает для меня:
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
Выход:
2021-05-18 18:50:56.505 INFO 75288 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: ktable
2021-05-18 18:50:56.595 INFO 75288 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: ktable
2021-05-18 18:50:56.596 INFO 75288 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: ktable
2021-05-18 18:50:56.722 INFO 75288 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
............ (property)=(value) ............
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
............ (property)=(value) ............
2021-05-18 18:50:56.960 INFO 75288 --- [ main] o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
2021-05-18 18:50:57.131 WARN 75288 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'sasl.jaas.config' was supplied but isn't a known config.
2021-05-18 18:50:57.134 INFO 75288 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-05-18 12:46:31.423 INFO 74020 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-05-18 12:46:31.423 INFO 74020 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1621367191408
2021-05-18 12:46:31.423 INFO 74020 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Auto creation of topics is disabled.
2021-05-18 12:46:31.456 INFO 74020 --- [ main] org.apache.kafka.streams.StreamsConfig : StreamsConfig values:
Я попробовал следующее, но это не сработало для меня:
Использование свойств из приведенной выше ссылки для установки тех же значений, что и в первом случае, приведет к следующей ошибке:
2021-05-18 18:59:44.140 INFO 73444 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: ktable
2021-05-18 18:59:44.231 INFO 73444 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: ktable
2021-05-18 18:59:44.232 INFO 73444 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: ktable
2021-05-18 18:59:44.352 INFO 73444 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
............ (property)=(value) .............
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
............ (property)=(value) ............
2021-05-18 18:59:44.596 INFO 73444 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-05-18 18:59:44.597 INFO 73444 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-05-18 18:59:44.597 INFO 73444 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1621389584594
2021-05-18 18:59:44.599 INFO 73444 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Auto creation of topics is disabled.
2021-05-18 18:59:44.611 INFO 73444 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1621389614599, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
Из ведения журнала вы можете видеть, что второй способ НЕ использует указанные вами значения и нет «Успешного входа в систему».
Кстати, я использую следующие зависимости:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.1.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.1.3-SNAPSHOT</version>
</dependency>
и следующие хранилища:
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>