Как использовать конфигурацию jaas в spring-cloud-stream-binder-kafka-streams

#spring #apache-kafka #apache-kafka-streams #spring-cloud-stream

Вопрос:

Я последовал этому примеру, чтобы создать аутентификацию в моем брокере кафке, однако для моего проекта есть два отличия

  1. Я использую связующее средство потоков кафки, в то время как в примере используется связующее средство кафки.
  2. У меня есть только один брокер, в то время как в примере используются два брокера.

Когда я запускаю свое приложение, оно возвращает следующую ошибку:

https://gist.github.com/JacsonF/7363e8d7f4f07a5bc77c28fb1e882674

Моя функция:

 @Bean
public Function<KStream<Object, String>, KStream<?, String>[]> receive() {
    Predicate<Object, String> isCompensado = (k, v) -> v.contains("XPTO");
    Predicate<Object, String> isntCompensado = (k, v) -> !v.contains("XPTO");
    return input -> input.branch(isCompensado, isntCompensado);
}
 

Я хотел бы использовать только свойства моего приложения для аутентификации, без внешнего файла, как в примере.
Как я могу это сделать?

приложение.yml

Комментарии:

1. Можете ли вы дать больше из своего стека? Нам нужно увидеть причину из трассировки, чтобы понять, в чем основная причина проблем, с которыми вы сталкиваетесь.

2. спасибо за ответ, я обновил и загрузил полный журнал, чтобы он понял

3. Вы уверены, что у вас есть правильная настройка KafkaClient в вашем файле конфигурации? Смотрите этот файл для ознакомления с тем, как настроить среду Kakfa: github.com/spring-cloud/spring-cloud-stream-samples/blob/main/…

4. В частности, вам нужна Client информация в вашем файле конфигурации.

5. @sobychacko Я создаю эту реализацию, я не знаю, лучший ли подход, если вы проверите и ответите мне, это было бы здорово. Возможно, я не был ясен, сообщая о проблеме, я попытался лучше объяснить ее в представленном мной проекте

Ответ №1:

Возникла проблема с тем, как конфигурация JAAS обрабатывалась в связывателе потоков Кафки для Spring Cloud Stream. Более подробную информацию смотрите в этом выпуске. Теперь это исправлено. Убедитесь, что вы используете версию 3.1.4-SNAPSHOT . Вот пример приложения, которое подтверждает это.

Обновить

Были выпущены привязки Кафки для Spring Cloud Stream, исправлена версия 3.1.4.

Если вы используете spring-cloud-dependencies в своем pom для управления зависимостями, обновите его, например, до (по крайней мере) версии 2020.0.4:

 <dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2020.0.4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
 

Комментарии:

1. Это здорово! для меня это прекрасно работает, большое вам спасибо.

2. @JacsonF Если вы должны принять этот ответ, новая версия SpCS Kafka Binder действительно устраняет проблему.

Ответ №2:

Это прекрасно работает для меня:

spring.kafka.jaas.enabled=true

spring.kafka.properties.security.protocol=SASL_SSL

spring.kafka.properties.sasl.mechanism=PLAIN

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";

Выход:

 2021-05-18 18:50:56.505  INFO 75288 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: ktable
2021-05-18 18:50:56.595  INFO 75288 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: ktable
2021-05-18 18:50:56.596  INFO 75288 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: ktable
2021-05-18 18:50:56.722  INFO 75288 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    ............ (property)=(value) ............
    sasl.mechanism = PLAIN
    security.protocol = SASL_SSL
    ............ (property)=(value) ............
2021-05-18 18:50:56.960  INFO 75288 --- [           main] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2021-05-18 18:50:57.131  WARN 75288 --- [           main] o.a.k.clients.admin.AdminClientConfig    : The configuration 'sasl.jaas.config' was supplied but isn't a known config.
2021-05-18 18:50:57.134  INFO 75288 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-05-18 12:46:31.423  INFO 74020 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-05-18 12:46:31.423  INFO 74020 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1621367191408
2021-05-18 12:46:31.423  INFO 74020 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Auto creation of topics is disabled.
2021-05-18 12:46:31.456  INFO 74020 --- [           main] org.apache.kafka.streams.StreamsConfig   : StreamsConfig values:
 

Я попробовал следующее, но это не сработало для меня:

Использование свойств из приведенной выше ссылки для установки тех же значений, что и в первом случае, приведет к следующей ошибке:

 2021-05-18 18:59:44.140  INFO 73444 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: ktable
2021-05-18 18:59:44.231  INFO 73444 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: ktable
2021-05-18 18:59:44.232  INFO 73444 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: ktable
2021-05-18 18:59:44.352  INFO 73444 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
    ............ (property)=(value) .............
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    ............ (property)=(value) ............
2021-05-18 18:59:44.596  INFO 73444 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-05-18 18:59:44.597  INFO 73444 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-05-18 18:59:44.597  INFO 73444 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1621389584594
2021-05-18 18:59:44.599  INFO 73444 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Auto creation of topics is disabled.
2021-05-18 18:59:44.611  INFO 73444 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1621389614599, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
 

Из ведения журнала вы можете видеть, что второй способ НЕ использует указанные вами значения и нет «Успешного входа в систему».

Кстати, я использую следующие зависимости:

     <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        <version>3.1.3-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>3.1.3-SNAPSHOT</version>
    </dependency>
 

и следующие хранилища:

 <repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <releases>
            <enabled>false</enabled>
        </releases>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>