#java #mapreduce #apache-flink #bigdata
#java #mapreduce создать #апачи-флинк #большие данные
Вопрос:
Я пытаюсь выполнить LeftOuterJoin в Flink. Я не пытаюсь реализовать leftOuterJoin самостоятельно, как это делается с помощью функции CoGroupFunction здесь: https://gist.github.com/mxm/c2e9c459a9d82c18d789
Я пытаюсь использовать функцию FlatJoinFunction:
public static final class leftOuter implements FlatJoinFunction<Tuple3<String,String,String>, Tuple2<String,String>, Tuple2<String,String>>{
@Override
public void join(Tuple3<String, String, String> in1,
Tuple2<String, String> in2,
Collector<Tuple2<String, String>> out) throws Exception {
// TODO Auto-generated method stub
out.collect(new Tuple2<String,String>(in1.f0, in2.f1 == null ? "null" : in2.f1));
}
}
Я вызываю эту функцию следующим образом:
input1.leftOuterJoin(input2).where(0)
.equalTo(1)
.with(new leftOuter());
К сожалению, я получаю исключение NullPointerException в строке out.collect.
Заранее благодарим вас за помощь!
Ответ №1:
Это ожидаемое поведение левого внешнего соединения.
Учитывая вашу программу, левое внешнее соединение вызывает JoinFunction
в двух случаях:
- если оба входа,
input1
иinput2
, имеют записи с одним и тем же ключом соединения,join()
вызывается для каждого элемента декартова произведения этого ключа. - если левый ввод,
input1
, содержит записи с ключом, которого нет в правом вводе (input2
) ,join()
вызывается для каждой записи с этим ключомinput1
иnull
для правого ввода.
Вы должны добавить проверку для in2 == null
вашего JoinFunction
.
Комментарии:
1. Спасибо! Я думаю, что это сработало (нужно проверить результаты). Код выглядит следующим образом: pastebin.com/SjB27Qyt сейчас же.