#apache-flink #flink-streaming #flink-cep
#apache-flink #flink-streaming #flink-cep
Вопрос:
У меня есть задание Apache Flink с 4 входными потоками данных (сообщениями JSON) из отдельных разделов Apache Kafka, и у меня есть только один объект XFilterFunction, который выполняет некоторую фильтрацию. Я написал несколько логик конвейера данных (для примитивного примера):
FilterFunction<MyEvent> xFilter = new XFilterFunction();
inputDataStream1.filter(xFilter)
.name("Xfilter")
.uid("Xfilter");
inputDataStream2
.union(inputDataStream3)
//here some logics (map, process,...)
.filter(xFilter);
Хорошая или плохая практика использовать один новый объект XFilterFunction в Job?
Или лучше использовать два новых объекта XFilterFunction? (2 потока -> 2 новых объекта фильтра)
Ответ №1:
Если вы создаете экземпляр класса несколько раз, т.е.
inputDataStream1.filter(new XFilterFunction());
...
inputDataStream2.filter(new XFilterFunction());
проблем быть не должно. Я не уверен, что в противном случае такие вещи, как состояние или переопределенные контекстные функции, будут показывать нежелательное поведение.
В случае, если это не специализация RichFunction
, возможно, даже просто вызов чистой функции происходит через делегатов, к сожалению, я не настолько глубоко разбираюсь во внутренних функциях Flink, чтобы сказать, но с решением выше вы должны быть в безопасности.