Поддерживает ли Kafka Connect перечисления?

#apache-kafka #apache-kafka-connect

#apache-kafka #apache-kafka-connect

Вопрос:

Поддерживаются ли поля перечислений в Kafka Connect? Если нет, то каков обычный обходной путь? Я смотрю на API Kafka 2.6.0 ConnectSchema здесь: https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/data/ConnectSchema.html

Я пытаюсь следовать рекомендациям, используя реестр Confluent Schema (с AVRO), но, похоже, не могу заставить свой пользовательский соединитель исходного кода генерировать схему, содержащую перечисления, соответствующие существующей схеме (в разделе вывода есть другие производители, помимо соединителя). Обходным путем было бы просто использовать строки, но это подрывает весь смысл схемы, не так ли?

Комментарии:

1. Перечисления в Avro преобразуются в строки в определенных соединителях приемника, но в AvroConverter есть свойство, называемое чем-то вроде расширенной поддержки avro, которое обрабатывает перечисления по-разному

2. Похоже, что «расширенная поддержка avro» — это специальный пользовательский код для разъема S3? Единственными ссылками, которые я смог найти, являются github.com/confluentinc/schema-registry/issues/1306 и docs.confluent.io/current/connect/kafka-connect-s3 /…

3. Это должно быть в конвертере Avro или десериализаторе

4. Похоже, большая часть логики здесь: github.com/confluentinc/schema-registry/blob/… . Поддержка кажется шаткой, так как эквивалентность схемы на основе документа и значений по умолчанию также может быть проблематичной: github.com/confluentinc/schema-registry/issues/1042

Ответ №1:

Не существует подхода, который работает в целом, но вы можете использовать специальную конфигурацию, специфичную для конвертера, в случае AVRO, а затем вы также должны предоставить специальные подсказки через свойства схемы поля перечисления. Я смог предоставить подсказки, используя пользовательское преобразование Connect.

Настройте конвертер Connect с помощью:

 "value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://registry:8081",
"value.converter.enhanced.avro.schema.support":  true,
"value.converter.connect.meta.data": false,
"transforms": "alarms",
"transforms.alarms.type": "org.jlab.kafka.connect.transforms.EpicsToAlarm$Value"
  

Затем пользовательское преобразование содержит:

     final Schema prioritySchema = SchemaBuilder
            .string()
            .doc("Alarm severity organized as a way for operators to prioritize which alarms to take action on first")
            .parameter("io.confluent.connect.avro.enum.doc.AlarmPriority", "Enumeration of possible alarm priorities")
            .parameter("io.confluent.connect.avro.Enum", "org.jlab.kafka.alarms.AlarmPriority")
            .parameter("io.confluent.connect.avro.Enum.1", "P1_LIFE")
            .parameter("io.confluent.connect.avro.Enum.2", "P2_PROPERTY")
            .parameter("io.confluent.connect.avro.Enum.3", "P3_PRODUCTIVITY")
            .parameter("io.confluent.connect.avro.Enum.4", "P4_DIAGNOSTIC")
            .build();
  

Полный источник преобразования

Комментарии:

1. расширенная ссылка.avro.support: docs.confluent.io/current/schema-registry/connect.html