Как обработать исключение NullPointerException при чтении, фильтрации и подсчете строк CSV-файлов с помощью SparkSession?

#apache-spark #apache-spark-sql #apache-spark-dataset

Вопрос:

Я пытаюсь прочитать CSV-файлы, хранящиеся в HDFS, с помощью sparkSession , подсчитать количество строк и распечатать значение на консоли. Тем не менее, я постоянно получаю NullPointerException при подсчете счет. Ниже приведен фрагмент кода,

 val validEmployeeIds = Set("12345", "6789")

val count =  sparkSession
    .read
    .option("escape", """)
    .option("quote", """)
    .csv(inputPath)
    .filter(row => validEmployeeIds.contains(row.getString(0)))
    .distinct()
    .count()

println(count)
 

Я получаю NPE точно в .filter таком состоянии. Если я удалю .filter код, он будет работать нормально и выведет счетчик. Как я могу справиться с этим NPE?

Это inputPath папка, содержащая несколько файлов CSV. Каждый CSV-файл содержит два столбца, один из которых представляет идентификатор, а другой-имя сотрудника. Ниже приведен пример извлечения CSV:

 12345,Employee1
AA888,Employee2
 

Я использую Spark версии 2.3.1.

Комментарии:

1. Можете ли вы добавить выписку из csv-файла, который вы пытаетесь прочитать ?

2. Добавлены сведения о извлечении образца csv

Ответ №1:

Попробуйте использовать isin функцию.

 import spark.implicits._

val validEmployeeIds = List("12345", "6789")

val df =  // Read CSV

df.filter('_c0.isin(validEmployeeIds:_*)).distinct().count()