Фильтрация JavaRDD в трех RDD?

#apache-spark #rdd

#apache-spark #rdd

Вопрос:

Я хочу отфильтровать JavaRDD для трех разных RDD на основе определенного условия.Прямо сейчас я трижды читаю один и тот же rdd и фильтрую его.Есть ли какой-либо другой эффективный способ добиться этого за одно сканирование?

 Example:

Like I have an rdd of type string and I want to filter it based on name 'anshu','suman' and 'neeraj'

rdd1=rdd.filter(s->{s.contains("anshu")?return true; else return false;})
rdd2=rdd.filter(s->{s.contains("suman")?return true; else return false;})
rdd3=rdd.filter(s->{s.contains("neeraj")?return true; else return false;})

Instead of filtering same rdd thrice,can I do it in single filter?
  

Комментарии:

1. Можете ли вы предоставить свой вариант использования. Это поможет в ответе. Например, каков ваш ввод и чего вы ожидаете.

2. @cody123-добавлен пример

3. @cody123 -спасибо, это даст вам один rdd, но я хочу, чтобы три разных rdd типа anshu, suman и neeraj выполнили с ними некоторые дополнительные операции.

4. Далее вы можете выполнить операцию над результирующим rdd на основе ключей.

5. Если мне нужно будет выполнить дальнейшую операцию с anshu, у меня сейчас нет ключа, можете ли вы привести какой-нибудь примерный пример, как этого добиться?

Ответ №1:

Вы можете проверить на примере ниже. Здесь я использую map, где все ваши три условия будут вести себя как ключ, и мы можем использовать reduce для группирования значений, связанных с этими ключами.

 JavaRDD<List<String>> rdd = javaSparkContext.textFile("/tmp/mathsetdata.dat").filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(String v1) throws Exception {
                String split[] = v1.split(" ");
                return split[0].equals("suman") || split[0].equals("anshu") || split[0].equals("neeraj");
            }
        }).mapToPair(new PairFunction<String, String, List<String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, List<String>> call(String t) throws Exception {
                String split[] = t.split(" ");
                List<String> list = new ArrayList<String>();
                list.add(split[1].trim());
                return new Tuple2<String, List<String>>(split[0].trim(), list);
            }
        }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public List<String> call(List<String> v1, List<String> v2) throws Exception {
                List<String> list = new ArrayList<String>();
                list.addAll(v1);
                list.addAll(v2);
                return list;
            }
        }).values();
  

Пример файла :

 suman 1001
anshu 1002
neeraj 1003
suman 1006
anshu 1007
neeraj 1008
suman 1016
anshu 1027
neeraj 1018
  

Также могут быть выполнены дополнительные операции.например.

 Tuple2<String, Integer> rdds = rdd.filter(new Function<Tuple2<String, List<String>>, Boolean>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Boolean call(Tuple2<String, List<String>> v1) throws Exception {
                return v1._1.equals("suman");
            }
        }).map(new Function<Tuple2<String, List<String>>, Tuple2<String, Integer>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(Tuple2<String, List<String>> v1) throws Exception {
                Integer sum = 0;
                for (String str : v1._2) {
                    sum  = Integer.parseInt(str);
                }
                return new Tuple2<String, Integer>(v1._1, sum);
            }
        }).collect().get(0);