Как получить доступ к серверу реестра confluent schema, защищенному паролем, с помощью Spring cloud stream?

#apache-kafka #avro #spring-cloud-stream #confluent-schema-registry #aiven

#apache-kafka #avro #весна-облако-поток #слияние-схема-реестр #айвен

Вопрос:

Я использую spring cloud stream вместе с реестром схем Aiven, который использует реестр схем confluent. Реестр схем Aiven защищен паролем. На основе этих инструкций необходимо установить эти два параметра конфигурации для успешного доступа к серверу реестра schema.

  props.put("basic.auth.credentials.source", "USER_INFO");
 props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
 

Все в порядке, когда я использую только драйверы kafka от vanilla java, но если я использую Spring cloud stream, я не знаю, как ввести эти два параметра. На данный момент я добавляю "basic.auth.user.info" и "basic.auth.credentials.source" "spring.cloud.stream.kafka.binder.configuration" в application.yml файл.

Делая это, я попадаю "401 Unauthorized" на строку, где схема хочет зарегистрироваться.

Обновление 1:

Основываясь на предложении ‘Ali n, я обновил способ настройки компонента SchemaRegistryClient, чтобы он знал о контексте SSL.

 @Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
  try {
    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(new FileInputStream(
            new File("path/to/client.keystore.p12")),
        "secret".toCharArray());

    final KeyStore trustStore = KeyStore.getInstance("JKS");
    trustStore.load(new FileInputStream(
            new File("path/to/client.truststore.jks")),
        "secret".toCharArray());

    TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

    SSLContext sslContext = SSLContextBuilder
        .create()
        .loadKeyMaterial(keyStore, "secret".toCharArray())
        .loadTrustMaterial(trustStore, acceptingTrustStrategy)
        .build();

    HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
    ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
        httpClient);
    ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
        new RestTemplate(requestFactory));
    schemaRegistryClient.setEndpoint(endpoint);
    return schemaRegistryClient;
  } catch (Exception ex) {
    ex.printStackTrace();
    return null;
  }
}
 

Это помогло избавиться от ошибки при запуске приложения и зарегистрировало схему. Однако всякий раз, когда приложение хотело отправить сообщение в Kafka, снова выдавалась новая ошибка. Наконец, это также было исправлено ответом mmelsen.

Комментарии:

1. Не могли бы вы поделиться всеми свойствами, которые вы добавили в приложение. файл yml?

2. @Milad Вы нашли решение для этого?

3. @mmelsen У меня был разговор с Али н. Я думаю, что он нашел решение. Он опубликует его, как только мы в этом убедимся.

Ответ №1:

Я столкнулся с той же проблемой, что и в ситуации, в которой я оказался, — подключиться к защищенному реестру схемы, размещенному в aiven и защищенному базовой аутентификацией. Для того чтобы заставить его работать, мне пришлось настроить следующие свойства:

 spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password
 

другими свойствами моего связующего являются:

 spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false
 

что на самом деле происходит, так это то, что Spring cloud stream добавит spring.kafka.properties.basic* в DefaultKafkaConsumerFactory, и это добавит конфигурацию в KafkaConsumer. В какой-то момент во время инициализации spring kafka создается CachedSchemaRegistryClient, который снабжен этими свойствами. Этот клиент содержит метод с именем configureRestService, который проверяет, содержит ли карта свойств «basic.auth.credentials.source». Поскольку мы предоставляем это через spring.kafka.properties, он найдет это свойство и позаботится о создании соответствующих заголовков при доступе к конечной точке реестра схемы.

надеюсь, это сработает и для вас.

Я использую версию spring cloud по Гринвичу.SR1, spring-boot-starter 2.1.4.RELEASE, avro-версия 1.8.2 и confluent.версия 5.2.1

Комментарии:

1. если вы используете это, можете ли вы также сказать мне, как я могу обеспечить, чтобы схема извлекалась из реестра вместо использования local?

Ответ №2:

Конфигурация binder обрабатывает только хорошо известные свойства потребителя и производителя.

Вы можете установить произвольные свойства на уровне привязки.

 spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...
 

Комментарии:

1. переместил параметры в раздел «spring.cloud.stream.kafka.bindings.output.producer.configuration», но ошибка stacktrace не изменилась

Ответ №3:

Поскольку Aiven использует SSL для протокола безопасности Kafka, для аутентификации требуется использовать сертификаты.

Вы можете перейти на эту страницу, чтобы понять, как это работает. В двух словах, вам нужно выполнить следующую команду для создания сертификатов и их импорта:

 openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks
 

Затем вы можете использовать следующие свойства для использования сертификатов:

 spring.cloud.stream.kafka.streams.binder:
  configuration:
    security.protocol: SSL
    ssl.truststore.location: client.truststore.jks
    ssl.truststore.password: secret
    ssl.keystore.type: PKCS12
    ssl.keystore.location: client.keystore.p12
    ssl.keystore.password: secret
    ssl.key.password: secret
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: org.apache.kafka.common.serialization.StringSerializer
 

Комментарии:

1. это отлично работает для самой Kafka, но не помогает с сервером реестра schema