#python #hadoop #apache-spark #pyspark
#python #hadoop #apache-spark #pyspark
Вопрос:
Я новичок в Spark, но у меня есть некоторый опыт работы с Hadoop. Я пытаюсь адаптировать код python, который я использую в потоковой передаче Hadoop, который отфильтровывает некоторые твиты в формате JSON.
Обычно моя функция имеет условие, которое выводит стандартный вывод твита, если условие истинно, и ничего не выводит в противном случае.
def filter(tweet):
if criteria(tweet) is True:
print json.dumps(tweet)
Таким образом, конечный выходной файл будет содержать только те твиты, которые я хочу.
Однако при попытке использовать Spark мне пришлось изменить print
оператор на return
so Я возвращаю твит, если условие истинно, и None
в противном случае.
def filter(tweet):
if criteria(tweet) is True:
return json.dumps(tweet)
Проблема возникает при попытке сохранить результаты на диск. Используя saveAsTextFile
метод Pyspark, он сохраняет не только твиты, которые я хочу, но и те None
, которые я возвращаю, когда условие ложно.
Как я могу избежать записи None
в файл, чтобы сохранить только нужные твиты?
Заранее большое спасибо.
Хорхе
Комментарии:
1. Почему вы не можете просто вернуть «» ? Если это не сработает, вы не можете выполнить простую постобработку для возвращенного дампа json?
2. Привет, free_mind. Это хороший момент, но не собирается ли он вместо этого печатать пустую строку ?….. Попробую на всякий случай. Спасибо за ваш ответ.
Ответ №1:
Довольно элегантное решение, которое позволяет избежать цепочки filter
и map
заключается в использовании flatMap
:
def filter(tweet):
return [json.dumps(tweet)] if criteria(tweet) is True else []
some_rdd.flatMap(filter)
Ответ №2:
Если вы используете свою функцию в map, это не уменьшит количество имеющихся у вас элементов. Чтобы фильтровать элементы, вы должны использовать filter
метод, чтобы проверить, является ли элемент None
после вас map
.