#apache-flink
#apache-flink
Вопрос:
Я столкнулся с проблемой при тестировании потока в мини-кластере flink в моем интеграционном тесте. Поток отображает сгенерированный класс Pojo Avro SpecificRecord (Java).
Потоковое задание написано на Scala.
Время выполнения flink завершается сбоем, поскольку оно не может создать экземпляр org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
Вот трассировка стека:
stack: java.lang.ClassCastException: class org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
java.lang.RuntimeException: Could not instantiate org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.
at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:53)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:572)
Я думаю, проблема в том, что Flink не может сериализовать класс Avro Pojo, потому что в классе есть несколько вложенных классов Avro Pojo.
Я попытался добавить всю информацию о типах для всех вложенных типов классов Pojo, но все равно столкнулся с той же проблемой.
Итак, теперь мне интересно, заставил ли кто-нибудь задание Flink работать с сгенерированным классом Avro Pojo с вложенными классами Avro Pojo. Все классы наследуют тип SpecificRecord и генерируются из схемы avro.
Есть ли какой-то специальный сериализатор, который нужно написать? Есть ли какая-либо документация или пример для такого сериализатора, который имеет дело с несколькими вложенными классами Pojo в Scala или Java?
Или это совсем другая проблема?
Заранее большое спасибо за любую помощь!
Ответ №1:
Проблема может возникнуть, если flink-avro
не указан путь к классу. Если вы все равно используете Avro, я бы полностью отключил Kryo, чтобы отлавливать более тонкие ошибки.
Комментарии:
1. Спасибо за ваш комментарий. Это то, что я проверил в первую очередь. Пожалуйста, посмотрите мой ответ, где я упоминаю, в чем была проблема.
Ответ №2:
Я заставил его работать, выполнив синтаксический анализ внутри функции процесса.
Мне пришлось разобрать строку в json, а затем в класс записи для одного конкретного поля класса SpecificRecord, которое должно было оказаться в DataSink.
Синтаксический анализ json теперь реализован в другом ProcessFuncton, и теперь он работает. До этого у меня был синтаксический анализ на карте, непосредственно примененный к потоку данных.