Запуск потока в мини-кластере Flink (1.11) и AvroKryoSerializerUtils не работают

#apache-flink

#apache-flink

Вопрос:

Я столкнулся с проблемой при тестировании потока в мини-кластере flink в моем интеграционном тесте. Поток отображает сгенерированный класс Pojo Avro SpecificRecord (Java).

Потоковое задание написано на Scala.

Время выполнения flink завершается сбоем, поскольку оно не может создать экземпляр org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils

Вот трассировка стека:

 stack: java.lang.ClassCastException: class org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
java.lang.RuntimeException: Could not instantiate org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.
    at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:53)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:572)
  

Я думаю, проблема в том, что Flink не может сериализовать класс Avro Pojo, потому что в классе есть несколько вложенных классов Avro Pojo.

Я попытался добавить всю информацию о типах для всех вложенных типов классов Pojo, но все равно столкнулся с той же проблемой.

Итак, теперь мне интересно, заставил ли кто-нибудь задание Flink работать с сгенерированным классом Avro Pojo с вложенными классами Avro Pojo. Все классы наследуют тип SpecificRecord и генерируются из схемы avro.

Есть ли какой-то специальный сериализатор, который нужно написать? Есть ли какая-либо документация или пример для такого сериализатора, который имеет дело с несколькими вложенными классами Pojo в Scala или Java?

Или это совсем другая проблема?

Заранее большое спасибо за любую помощь!

Ответ №1:

Проблема может возникнуть, если flink-avro не указан путь к классу. Если вы все равно используете Avro, я бы полностью отключил Kryo, чтобы отлавливать более тонкие ошибки.

Комментарии:

1. Спасибо за ваш комментарий. Это то, что я проверил в первую очередь. Пожалуйста, посмотрите мой ответ, где я упоминаю, в чем была проблема.

Ответ №2:

Я заставил его работать, выполнив синтаксический анализ внутри функции процесса.

Мне пришлось разобрать строку в json, а затем в класс записи для одного конкретного поля класса SpecificRecord, которое должно было оказаться в DataSink.

Синтаксический анализ json теперь реализован в другом ProcessFuncton, и теперь он работает. До этого у меня был синтаксический анализ на карте, непосредственно примененный к потоку данных.