#apache-spark #rdd
#apache-spark #rdd
Вопрос:
Я хочу отфильтровать JavaRDD для трех разных RDD на основе определенного условия.Прямо сейчас я трижды читаю один и тот же rdd и фильтрую его.Есть ли какой-либо другой эффективный способ добиться этого за одно сканирование?
Example:
Like I have an rdd of type string and I want to filter it based on name 'anshu','suman' and 'neeraj'
rdd1=rdd.filter(s->{s.contains("anshu")?return true; else return false;})
rdd2=rdd.filter(s->{s.contains("suman")?return true; else return false;})
rdd3=rdd.filter(s->{s.contains("neeraj")?return true; else return false;})
Instead of filtering same rdd thrice,can I do it in single filter?
Комментарии:
1. Можете ли вы предоставить свой вариант использования. Это поможет в ответе. Например, каков ваш ввод и чего вы ожидаете.
2. @cody123-добавлен пример
3. @cody123 -спасибо, это даст вам один rdd, но я хочу, чтобы три разных rdd типа anshu, suman и neeraj выполнили с ними некоторые дополнительные операции.
4. Далее вы можете выполнить операцию над результирующим rdd на основе ключей.
5. Если мне нужно будет выполнить дальнейшую операцию с anshu, у меня сейчас нет ключа, можете ли вы привести какой-нибудь примерный пример, как этого добиться?
Ответ №1:
Вы можете проверить на примере ниже. Здесь я использую map, где все ваши три условия будут вести себя как ключ, и мы можем использовать reduce для группирования значений, связанных с этими ключами.
JavaRDD<List<String>> rdd = javaSparkContext.textFile("/tmp/mathsetdata.dat").filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(String v1) throws Exception {
String split[] = v1.split(" ");
return split[0].equals("suman") || split[0].equals("anshu") || split[0].equals("neeraj");
}
}).mapToPair(new PairFunction<String, String, List<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, List<String>> call(String t) throws Exception {
String split[] = t.split(" ");
List<String> list = new ArrayList<String>();
list.add(split[1].trim());
return new Tuple2<String, List<String>>(split[0].trim(), list);
}
}).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
private static final long serialVersionUID = 1L;
@Override
public List<String> call(List<String> v1, List<String> v2) throws Exception {
List<String> list = new ArrayList<String>();
list.addAll(v1);
list.addAll(v2);
return list;
}
}).values();
Пример файла :
suman 1001
anshu 1002
neeraj 1003
suman 1006
anshu 1007
neeraj 1008
suman 1016
anshu 1027
neeraj 1018
Также могут быть выполнены дополнительные операции.например.
Tuple2<String, Integer> rdds = rdd.filter(new Function<Tuple2<String, List<String>>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, List<String>> v1) throws Exception {
return v1._1.equals("suman");
}
}).map(new Function<Tuple2<String, List<String>>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Tuple2<String, List<String>> v1) throws Exception {
Integer sum = 0;
for (String str : v1._2) {
sum = Integer.parseInt(str);
}
return new Tuple2<String, Integer>(v1._1, sum);
}
}).collect().get(0);