Как вызвать UDF, использующий spark sql, и вызвать этот udf в запросе выбора

#python #pyspark #apache-spark-sql #user-defined-functions #azure-databricks

#python #pyspark #apache-spark-sql #определяемые пользователем функции #azure-блоки данных

Вопрос:

Я новичок в программировании на Python. Может кто-нибудь, пожалуйста, помогите мне ниже.

У меня есть UDF, как показано ниже:

 def age(dat):
    try:
        df_min_date = spark.sql(f"""SELECT MIN(opdate) dat1
                      FROM ABC
                      WHERE optype='MOVE'
                      AND opdate<={dat}""").first()[0]
       
        df_min_date=datetime(df_min_date.year,df_min_date.month,df_min_date.day)
        l_dat=datetime.strptime(dat,'%Y-%m-%d').date()
        l_dat=datetime(l_dat.year,l_dat.month,l_dat.day)

        age=(df_min_date-l_dat).days
        if age =='':
          age=0
        return age
    except Exception as ex:
        print(f"Error: {ex}")
        return 0

f_age = spark.udf.register("f_age",age)
 

Я получаю ошибку не удалось сериализовать объект

Мне нужно использовать эту функцию в spark sql, как показано ниже

 spark.sql("""select year,mn, age(dat) age from age""")
 

Как я могу вызвать эту функцию в моем sql-запросе??

Заранее спасибо

Комментарии:

1. В чем смысл вашего UDF? Он вообще не использует входной аргумент dat

2. также, пожалуйста, предоставьте более подробную информацию о ваших таблицах age ABC и dat

3. @mck отредактировал код. Надеюсь, вы понимаете, что мне нужно. Спасибо за вашу помощь

4. @pythonCoder Зачем вам для этого нужен udf? я думаю, вы пытаетесь получить минимальную разницу между dat и наименьшую opDate , где optype = MOVE в днях, следующий sql может это сделать. spark.sql("SELECT year , mn , MIN(case when optype='MOVE' and opdate<=dat then opdate-dat else null end ) MIN_OP_DATE_AGE from ABC group by year , mn ")