#java #apache-flink #flink-streaming
#java #apache-flink #потоковая передача flink
Вопрос:
я создал потоковое задание flink для чтения XML-файла из kafka, преобразования файла и записи его в базу данных. Поскольку атрибуты в файле xml не соответствуют именам столбцов базы данных, я создал вариант переключения для сопоставления.
Поскольку это не очень гибко, я хочу удалить эту информацию о встроенном отображении из кода. Прежде всего, мне пришла в голову идея файла сопоставления, который мог бы выглядеть следующим образом:
path.in.xml.to.attribut=database.column.name
Текущая логика задания выглядит следующим образом:
switch(path.in.xml.to.attribute){
case "example.one.name":
return "name";
С файлом сопоставления, я думаю, я бы работал с картой для хранения данных сопоставления в виде пары ключ-значение.
Это сделало бы работу более гибкой, как сейчас. Тем не менее недостатком было бы то, что для каждого изменения в этой конфигурации, которое я хочу применить, мне пришлось бы перезапускать задание flink.
Мой вопрос в том, возможно ли внедрить такую логику отображения во время выполнения, например, через собственную тему kafka. И когда такая реализация возможна, как это может выглядеть в качестве примера.
Ответ №1:
Если единственное, что вам нужно, это иметь возможность обновлять сопоставление между атрибутами xml и именами столбцов базы данных, тогда можно использовать шаблон состояния широковещательной передачи. Также полезно практическое руководство по состоянию трансляции в Apache Flink.
Идея состоит в том, чтобы иметь поток, подписанный на вашу собственную тему kafka с сопоставлениями базы данных, который транслирует обновления всем менеджерам задач. Эти операторы будут поддерживать это Map<String, String>
как состояние, и вы можете использовать это состояние сопоставления для разрешения имени столбца, т. Е. Вместо switch(path.in.xml.to.attribute)
использования map.get(path.in.xml.to.attribute))
. map
Оператор в этом случае должен быть заменен на BroadcastProcessFunction
.
Комментарии:
1. О, это звучит хорошо, даже если я сейчас не понимаю, как взаимодействовать с моим POJO, который содержит все узлы xml на карте, но я думаю, это то, что я могу задать в новом вопросе, поскольку это кажется «правильным» ответом. Большое спасибо