Flink docker compose — пользовательская библиотека

#java #docker-compose #cluster-computing #apache-flink #classpath

#java #docker-compose #кластерные вычисления #apache-flink #путь к классу

Вопрос:

Я пытаюсь настроить кластер сеансов Flink с помощью docker-compose. Я хотел бы иметь пользовательскую библиотеку, загруженную в Flink, поскольку эта библиотека содержит код, который используется во всех моих заданиях. Я делаю это, создавая пользовательский образ docker следующим образом:

 FROM flink:1.10.0
WORKDIR /opt/flink/lib

RUN mkdir /opt/flink/usrlib
RUN chown flink:flink /opt/flink/usrlib

ADD --chown=flink:flink ./myLibrary.jar /opt/flink/lib/myLibary.jar
  

Менеджеры заданий / задач запускаются успешно. Когда я отправляю задание с помощью веб-интерфейса, моя работа выполняется правильно, за одним исключением:

В моей библиотеке у меня есть оператор отображения flink (называемый DeserialisationMapper), который использует сообщения JSON из Kafka и создает пользовательские объекты Java на основе тега в сообщении. Например, если сообщение

 {"objectType": "Address", "street": "Street 1"}
  

мой DeserialisationMapper создает Java POJO, экземпляр класса Address, для поля с именем «street» установлено значение «Street 1». Я делаю это, используя отражение Java. Пользовательские классы Java для POJO доступны только в самом задании (не в библиотеке). Когда я выполняю свою программу в своем Eclipse (моя пользовательская библиотека предоставляется как зависимость Maven), все работает нормально. Программа DeserialisationMapper может находить пользовательские классы Java, расположенные в проекте задания. Когда я экспортирую «толстый» jar для задания, это jar, который включает в себя все зависимости задания (например, myLibrary.jar ) и разверните его в кластере flink, он тоже работает нормально. Но, когда я пытаюсь поместить свою библиотеку в кластер flink (используя пользовательское изображение, показанное выше) и исключить ее из job jar, я получаю ClassNotFoundException, запрашивающее, что конкретный класс (например, адрес) не может быть найден, хотя путь к этому классу выглядит правильно (например, org.eclipse.MyJob.типы данных.Адрес) — я подтвердил, что класс находится в банке заданий в правильном месте. ПРИМЕЧАНИЕ.: моя работа может фактически получать доступ к методам в myLibrary.jar как, например, потребитель Kafka создается в методе в myLibrary.jar что вызвано моей работой).

Почему это происходит? Не следует myLibrary.jar сможете ли вы найти классы, включенные в мой список заданий? Должен ли я сделать какую-либо конкретную конфигурацию или это вообще невозможно?

Ответ №1:

Нашел решение сам. В соответствии с этим классы из пользовательских jar в кластере сеансов загружаются динамически, поэтому к ним не могут быть доступны библиотеки, которые загружаются в classpath Flink. Одним из решений является помещение пользовательских jars в папку lib, что для меня не работает, поскольку я хочу, чтобы мои пользователи могли отправлять свои задания через пользовательский интерфейс. Другое решение, которое работает для меня, описано в этом подразделе. В принципе, когда требуются классы из пользовательских jar, ваши операторы Flink должны создавать экземпляр classloader с помощью getRuntimeContext().getUserCodeClassLoader() . Для этого они должны быть расширенными функциями (например, RichFlatMapFunction ). Затем, используя этот загрузчик классов, вы можете вызвать loadClass(className) метод, указывающий на путь, где находится пользовательский класс.