#apache-kafka #avro #spring-cloud-stream #confluent-schema-registry #aiven
#apache-kafka #avro #весна-облако-поток #слияние-схема-реестр #айвен
Вопрос:
Я использую spring cloud stream вместе с реестром схем Aiven, который использует реестр схем confluent. Реестр схем Aiven защищен паролем. На основе этих инструкций необходимо установить эти два параметра конфигурации для успешного доступа к серверу реестра schema.
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
Все в порядке, когда я использую только драйверы kafka от vanilla java, но если я использую Spring cloud stream, я не знаю, как ввести эти два параметра. На данный момент я добавляю "basic.auth.user.info"
и "basic.auth.credentials.source"
"spring.cloud.stream.kafka.binder.configuration"
в application.yml
файл.
Делая это, я попадаю "401 Unauthorized"
на строку, где схема хочет зарегистрироваться.
Обновление 1:
Основываясь на предложении ‘Ali n, я обновил способ настройки компонента SchemaRegistryClient, чтобы он знал о контексте SSL.
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
Это помогло избавиться от ошибки при запуске приложения и зарегистрировало схему. Однако всякий раз, когда приложение хотело отправить сообщение в Kafka, снова выдавалась новая ошибка. Наконец, это также было исправлено ответом mmelsen.
Комментарии:
1. Не могли бы вы поделиться всеми свойствами, которые вы добавили в приложение. файл yml?
2. @Milad Вы нашли решение для этого?
3. @mmelsen У меня был разговор с Али н. Я думаю, что он нашел решение. Он опубликует его, как только мы в этом убедимся.
Ответ №1:
Я столкнулся с той же проблемой, что и в ситуации, в которой я оказался, — подключиться к защищенному реестру схемы, размещенному в aiven и защищенному базовой аутентификацией. Для того чтобы заставить его работать, мне пришлось настроить следующие свойства:
spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password
другими свойствами моего связующего являются:
spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false
что на самом деле происходит, так это то, что Spring cloud stream добавит spring.kafka.properties.basic* в DefaultKafkaConsumerFactory, и это добавит конфигурацию в KafkaConsumer. В какой-то момент во время инициализации spring kafka создается CachedSchemaRegistryClient, который снабжен этими свойствами. Этот клиент содержит метод с именем configureRestService, который проверяет, содержит ли карта свойств «basic.auth.credentials.source». Поскольку мы предоставляем это через spring.kafka.properties, он найдет это свойство и позаботится о создании соответствующих заголовков при доступе к конечной точке реестра схемы.
надеюсь, это сработает и для вас.
Я использую версию spring cloud по Гринвичу.SR1, spring-boot-starter 2.1.4.RELEASE, avro-версия 1.8.2 и confluent.версия 5.2.1
Комментарии:
1. если вы используете это, можете ли вы также сказать мне, как я могу обеспечить, чтобы схема извлекалась из реестра вместо использования local?
Ответ №2:
Конфигурация binder обрабатывает только хорошо известные свойства потребителя и производителя.
Вы можете установить произвольные свойства на уровне привязки.
spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...
Комментарии:
1. переместил параметры в раздел «spring.cloud.stream.kafka.bindings.output.producer.configuration», но ошибка stacktrace не изменилась
Ответ №3:
Поскольку Aiven использует SSL для протокола безопасности Kafka, для аутентификации требуется использовать сертификаты.
Вы можете перейти на эту страницу, чтобы понять, как это работает. В двух словах, вам нужно выполнить следующую команду для создания сертификатов и их импорта:
openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks
Затем вы можете использовать следующие свойства для использования сертификатов:
spring.cloud.stream.kafka.streams.binder:
configuration:
security.protocol: SSL
ssl.truststore.location: client.truststore.jks
ssl.truststore.password: secret
ssl.keystore.type: PKCS12
ssl.keystore.location: client.keystore.p12
ssl.keystore.password: secret
ssl.key.password: secret
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
Комментарии:
1. это отлично работает для самой Kafka, но не помогает с сервером реестра schema