LeftOuterJoin в Flink (JAVA API)

#java #mapreduce #apache-flink #bigdata

#java #mapreduce создать #апачи-флинк #большие данные

Вопрос:

Я пытаюсь выполнить LeftOuterJoin в Flink. Я не пытаюсь реализовать leftOuterJoin самостоятельно, как это делается с помощью функции CoGroupFunction здесь: https://gist.github.com/mxm/c2e9c459a9d82c18d789

Я пытаюсь использовать функцию FlatJoinFunction:

     public static final class leftOuter implements FlatJoinFunction<Tuple3<String,String,String>, Tuple2<String,String>, Tuple2<String,String>>{


    @Override
    public void join(Tuple3<String, String, String> in1,
            Tuple2<String, String> in2,
            Collector<Tuple2<String, String>> out) throws Exception {
        // TODO Auto-generated method stub
        out.collect(new Tuple2<String,String>(in1.f0, in2.f1 == null ? "null" : in2.f1));

    }

}
 

Я вызываю эту функцию следующим образом:

         input1.leftOuterJoin(input2).where(0)
            .equalTo(1)
            .with(new leftOuter());
 

К сожалению, я получаю исключение NullPointerException в строке out.collect.

Заранее благодарим вас за помощь!

Ответ №1:

Это ожидаемое поведение левого внешнего соединения.

Учитывая вашу программу, левое внешнее соединение вызывает JoinFunction в двух случаях:

  1. если оба входа, input1 и input2 , имеют записи с одним и тем же ключом соединения, join() вызывается для каждого элемента декартова произведения этого ключа.
  2. если левый ввод, input1 , содержит записи с ключом, которого нет в правом вводе ( input2 ) , join() вызывается для каждой записи с этим ключом input1 и null для правого ввода.

Вы должны добавить проверку для in2 == null вашего JoinFunction .

Комментарии:

1. Спасибо! Я думаю, что это сработало (нужно проверить результаты). Код выглядит следующим образом: pastebin.com/SjB27Qyt сейчас же.