#scala #apache-spark
#scala #apache-spark
Вопрос:
Я знаю, что вопрос не является чем-то новым, но мой контекст отличается.Я пытался наилучшим образом эффективно обрабатывать исключения.Ниже приведен мой экспериментальный код
val in = List("1", "2", "3", "abc")
val sc = prepareConfig();
val rdd = sc.parallelize(in)
val mapRDD = rdd.map { x => Try { x.toInt } }
val successRDD = mapRDD.filter { x => x.isSuccess }
val successRDD2 =
successRDD.map { x => Try { x.get * x.get } }.filter{ _.isSuccess }
.
. // some more Transformation operations based on the use case
.
successRDD10.collect()
Предположим, что мне может потребоваться 10 преобразований, которые приведут к конечному результату. Должен ли я использовать Try{} для всех моих преобразований? Ожидается, что я могу получить ошибку в любом из 10 преобразований.Итак, я пытаюсь отфильтровать успешные результаты и передать их на следующий этап. итак, рекомендуется ли приведенный выше эффективный способ обработки исключений или какой-либо другой альтернативный подход?
Ответ №1:
Лучший способ:
val res: RDD[Int] = rdd.flatMap(x => Try(x.toInt).map(i => i * i).toOption)
Это действительно делает то, чего вы пытаетесь достичь. Как?
- Каждый
String
из них преобразуется вInt
:Try(x.toInt)
- Если преобразование выполнено успешно, целое число умножается само на себя:
map(i => i * i)
- Результат
Try[Int]
преобразуется вOption[Int]
то естьNone
, если значениеTry[Int]
равно aFailure
иSome(value)
в противном случае:toOption
- Благодаря
flatMap
операцииOption[Int]
в результирующем файле остаются только те, которые определены (т.е. НетNone
, поэтому преобразование прошло успешно).RDD[Int]
Нет необходимости в дополнительных фильтрах и isSuccess
вызовах.
Комментарии:
1. Что делать, если вы хотите использовать DataFrames и его выражение sql, и есть такое исключение во время выполнения из-за данных. Есть ли способ справиться с тем же?