Исключение RestClientException реестра схемы Kafka: неавторизованное; код ошибки: 401

#java #apache-kafka #confluent-schema-registry

#java #apache-kafka #confluent-schema-registry

Вопрос:

Я пытаюсь прочитать данные из раздела kafka avro, используя схему avro из реестра confluent client. Я использую io.confluent версию библиотеки 5.4.1 . Это запись в файле gradle

     compile (group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.1') {
        exclude group: 'org.apache.avro', module: 'avro'
    }
 

Я получаю следующую ошибку.

 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
 
 public PCollection<KV<String, GenericRecord>> apply(Pipeline p) {
       ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
                ConfluentSchemaRegistryDeserializerProvider.of(params.schemaUrl, "topic-value");

        PCollection<KafkaRecord<String, GenericRecord>> records = p.apply("GetDataFromKafka", KafkaIO.<String, GenericRecord>read()
                .withBootstrapServers(params.apiHost)
                .withTopics("topic")
                .withConsumerConfigUpdates(params.getConsumerProps())
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(valDeserializerProvider)
                .commitOffsetsInFinalize());

        return records.apply("TopicAndDataInput", MapElements.via(new SimpleFunction<KafkaRecord<String, GenericRecord>, KV<String, GenericRecord>>() {
            @Override
            public KV<String, GenericRecord> apply(KafkaRecord<String, GenericRecord> input) {
                String topic = input.getTopic();
                GenericRecord data = input.getKV().getValue();
                return KV.of(topic, data);
            }
        }));
    }
 

Чего мне здесь не хватает? Может ли кто-нибудь указать мне правильное направление.
Заранее спасибо.

Это функция для получения потребительских свойств

     public Map<String, Object> getConsumerProps() {
        Map<String, Object> props = new HashMap<> ();

        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("retry.backoff.ms",500);
        props.put("max.partition.fetch.bytes", 8388608);
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "registry_key:secret");
        props.put("ssl.endpoint.identification.algorithm","https");
        props.put("security.protocol","SASL_SSL");
            
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='"  apiKey  "' password='"   apiSecret  "';");
        props.put("sasl.mechanism","PLAIN");
        return props;
    }
 

Пробовал также со следующими реквизитами и все равно получаю ту же несанкционированную ошибку.

 props.put("basic.auth.credentials.source", "USER_INFO");
props.put("schema.registry.basic.auth.user.info", "<registry key>:<value>");
props.put("schema.registry.url", schemaUrl);
 

Комментарии:

1. Unauthorized; error code: 401 довольно четко. Проверяли ли вы учетные данные реестра схемы напрямую, используя что-то вроде like curl ?

2. curl -s -u <registry_key:secret> GET https://<host_name>.confluent.cloud/subjects возвращает действительный ответ.

3. Вы импортировали ConfluentSchemaRegistryDeserializerProvider из Apache Beam и намеревались ли вы это сделать?

4. Да, точно. Я импортировал org.apache.beam.sdk.io.kafka. ConfluentSchemaRegistryDeserializerProvider;

5. Вы нашли решение для этого?

Ответ №1:

Раньше у меня была такая же проблема с библиотеками 5.3.0 . Я разрешил его обновление до

 'org.apache.avro:avro:1.10.2'
'io.confluent:kafka-schema-registry-client:6.2.0'
 

Я использую следующие реквизиты для подключения реестра схемы:

 "schema.registry.url": "<URL>"
"schema.registry.basic.auth.credentials.source": "USER_INFO"
"schema.registry.basic.auth.user.info": "<API_KEY>:<API_SECRET>"
 

Ответ №2:

Похоже, вам следует добавить конкретный раздел реестра схемы и секрет как

 props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>");
 

к свойствам. (из https://docs.confluent.io/cloud/current/cp-component/streams-cloud-config.html )

Комментарии:

1. У меня еще не было возможности попробовать это. Я попробую и обновлю, работает или нет. Спасибо.

2. Обновил вопрос с указанными вами свойствами, и он по-прежнему выдает ту же ошибку. Интересно, есть ли что-то большее, чем .withConsumerConfigUpdates передача этих свойств.

Ответ №3:

Обязательно используйте по крайней мере версию 5.0 библиотек реестра Confluent schema kafka-schema-registry amp; kafka-avro-serializer , поскольку более ранние версии не поддерживают аутентификацию.