Отображение Apache Flink во время выполнения

#java #apache-flink #flink-streaming

#java #apache-flink #потоковая передача flink

Вопрос:

я создал потоковое задание flink для чтения XML-файла из kafka, преобразования файла и записи его в базу данных. Поскольку атрибуты в файле xml не соответствуют именам столбцов базы данных, я создал вариант переключения для сопоставления.

Поскольку это не очень гибко, я хочу удалить эту информацию о встроенном отображении из кода. Прежде всего, мне пришла в голову идея файла сопоставления, который мог бы выглядеть следующим образом:

 path.in.xml.to.attribut=database.column.name
  

Текущая логика задания выглядит следующим образом:

 switch(path.in.xml.to.attribute){
    case "example.one.name":
        return "name";
  

С файлом сопоставления, я думаю, я бы работал с картой для хранения данных сопоставления в виде пары ключ-значение.

Это сделало бы работу более гибкой, как сейчас. Тем не менее недостатком было бы то, что для каждого изменения в этой конфигурации, которое я хочу применить, мне пришлось бы перезапускать задание flink.

Мой вопрос в том, возможно ли внедрить такую логику отображения во время выполнения, например, через собственную тему kafka. И когда такая реализация возможна, как это может выглядеть в качестве примера.

Ответ №1:

Если единственное, что вам нужно, это иметь возможность обновлять сопоставление между атрибутами xml и именами столбцов базы данных, тогда можно использовать шаблон состояния широковещательной передачи. Также полезно практическое руководство по состоянию трансляции в Apache Flink.

Идея состоит в том, чтобы иметь поток, подписанный на вашу собственную тему kafka с сопоставлениями базы данных, который транслирует обновления всем менеджерам задач. Эти операторы будут поддерживать это Map<String, String> как состояние, и вы можете использовать это состояние сопоставления для разрешения имени столбца, т. Е. Вместо switch(path.in.xml.to.attribute) использования map.get(path.in.xml.to.attribute)) . map Оператор в этом случае должен быть заменен на BroadcastProcessFunction .

Комментарии:

1. О, это звучит хорошо, даже если я сейчас не понимаю, как взаимодействовать с моим POJO, который содержит все узлы xml на карте, но я думаю, это то, что я могу задать в новом вопросе, поскольку это кажется «правильным» ответом. Большое спасибо