Не удалось захватить исключение Pyspark UDF

#apache-spark #pyspark #apache-spark-sql #user-defined-functions

#apache-spark #pyspark #apache-spark-sql #определяемые пользователем функции

Вопрос:

Я пытаюсь проанализировать столбец, содержащий строку даты, используя dateparser через PysparkUDF.

Если синтаксический анализ завершается неудачно, хотелось бы передать пробел в столбец «date_field».

Хотя я использую «try-catch» для захвата ошибки атрибута, он не работает. Даже если блок except заканчивается получением ошибки ниже. Ошибка атрибута: объект ‘NoneType’ не имеет атрибута ‘date’

 import dateparser

        try:
            parse_date = udf(
                lambda z: dateparser.parse(z).date().strftime("%Y-%m-%d"),
                StringType(),
            )
            build_df = source_df.withColumn(
                "date_field",
                when(col(source_column_name).isNotNull(), to_date(parse_date(col(source_column_name)))).otherwise(" "),
            )
        except  AttributeError:
            build_df = source_df.withColumn("date_field", lit(" "))
        return build_df
 

Ответ №1:

Две вещи:

  1. Spark использует отложенную оценку, поэтому функция не вычисляется, пока вы не вызовете что-то вроде .show() , .collect() или .toPandas() . Поэтому весь блок try выполняется без ошибок, потому что Spark еще ничего не оценил. Он только составил план выполнения.
  2. Чтобы решить эту проблему, вместо того, чтобы перехватывать ошибку, замените вашу лямбда-функцию на
 lambda z: dateparser.parse(z).date().strftime("%Y-%m-%d") if z is not None else " "
 

а также замените строку

 when(col(source_column_name).isNotNull(), to_date(parse_date(col(source_column_name)))).otherwise(" ")
 

с

 to_date(parse_date(col(source_column_name)))
 

Это известная ошибка Spark для оценки UDF, даже если when условие равно False, поэтому вместо этого поместите условие внутри UDF.

В целом:

 parse_date = udf(                
    lambda z: dateparser.parse(z).date().strftime("%Y-%m-%d") if z is not None else " ",
    StringType()
)

build_df = source_df.withColumn(
    "date_field",
     to_date(parse_date(col(source_column_name)))
)
 

Комментарии:

1. Спасибо, что сообщили о существующей ошибке. Изменен udf, чтобы исключить значения столбцов, когда невозможно проанализировать — если dateparser.parse(z) не является никем другим » «.