#spring-kafka #spring-cloud-stream #confluent-schema-registry #spring-cloud-stream-binder-kafka
#spring-kafka #spring-cloud-stream #confluent-schema-registry #spring-cloud-stream-binder-kafka
Вопрос:
У меня есть проект, использующий Spring cloud stream со связующим Kafka Streams. Для вывода потока я использую Avro с Serde, предоставляемым Confluent( io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
) .
Я могу использовать его с реестром Confluent Schema. Сериализация и десериализация выполняются правильно. Однако я хотел посмотреть, можем ли мы использовать сервер реестра Spring Cloud Schema вместо Confluent. Я настроил автономный сервер реестра схем и установил для него реестр схем в моем проекте (изменил свойства schemaRegistryClient.endpoint
и schema.registry.url
).
Когда я попробовал это, кажется, что Spring Cloud может работать с автономным сервером. Он регистрирует схему, доступную в папке resources, как файл .avsc. Однако, когда я отправляю сообщение, кажется, что сериализатор Confluent продолжает обращаться к нему как к реестру Confluent Schema (который имеет разные конечные точки REST из реестра Spring Schema). В результате он получает код ответа 405.
Мы получаем следующее исключение (частичная трассировка стека)
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: <my-avro-schema>
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
Мне кажется, что есть две возможности:
- Сервер реестра Spring Schema может работать только с типом содержимого, предоставляемым Spring (указанным как
content-type: application/* avro
), а не с собственным Serde, предоставляемым Confluent, или - Существует проблема с конфигурацией проекта.
Может кто-нибудь помочь мне выяснить, что это такое? Если это второй, может кто-нибудь указать, что не так?
Ответ №1:
Для каждого поставщика реестра схем требуется собственная библиотека SerDe. Например, если вы хотите интегрировать реестр AWS Glue Schema с Kafka, вам понадобятся материалы Amazon SerDe. Следовательно, библиотека SerDe Confluent ожидает, что реестр схемы Confluent находится по адресу, указанному в schema.registry.url
свойстве.