Как десериализовать перечисление Avro в языке SQL Flink?

#apache-kafka #apache-flink #avro #flink-sql

Вопрос:

У меня есть тема Кафки со следующим Avro IDL и зарегистрирована в реестре схем.

     @namespace("my.type.avro")
    protocol MyProtocol {
      enum MyEnumType {
       Type1, Type2
      }

      record MyEntry {
         MyEnumType myEntryType = "Type1";
      }

      record MyRecord {
          MyEntry entry;
      }
    }
 

Чтобы прочитать из этой темы, я определил следующий DDL:

     CREATE TABLE my_table

    (
      `entry` ROW(`myEntryType` ROW(???))
     ) WITH (
         'connector' = 'kafka',
         'topic' = 'my-topic',
         'properties.bootstrap.servers' = '...:9092',
         'scan.startup.mode' = 'latest-offset',
         'value.format' = 'avro-confluent',
         'value.avro-confluent.schema-registry.url' = 'http://...:8081'
    )
 

И я выполняю следующий запрос :

     SELECT * FROM my_table
 

Теперь я получил следующие сообщения в Flink-1.13.1, когда я использую СТРОКУ для
типа:

      *Caused by: java.io.IOException: Failed to deserialize Avro record.*
       at
     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
       at
     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
       at
     org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
       at
     org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
       at
     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
       at
     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
       at
     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
       at
     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
       at
     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
       at
     org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
     *Caused by: org.apache.avro.AvroTypeException: Found
     my.type.avro.MyEnumType, expecting union*
       at
     org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
       at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
       at
     org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
       at
     org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:1
     ...
 

Я пробовал *RAW('class','snapshot') , где находится «класс» my.type.avro.MyEnumType , но не могу найти подходящий сериализатор моментальных снимков. Пробовал кучу из них, например, PojoSerializerSnapshot, KryoSerializer.KryoSerializerConfigSnapshot, StringSerializer, AvroSerializer и т. Д., Ни один из которых не работал.

Комментарии:

1. В настоящее время в DDL невозможно использовать все типы данных внешних систем. Я бы рекомендовал использовать соединитель API потока данных и использовать StreamTableEnvironment.fromDataStream его для выполнения преобразования в SQL с необработанными типами.

Ответ №1:

Правда, в настоящее время обходной путь заключается в использовании API потока данных для чтения данных и предоставления пользовательской схемы Avro для настройки формата. Затем переключитесь на табличный API, как указано в следующем потоке : https://www.mail-archive.com/user@flink.apache.org/msg44520.html

Кроме того, для дальнейшей поддержки перечисления через табличный API открывается следующий билет jira: https://issues.apache.org/jira/browse/FLINK-24544