не удалось вызвать функцию pyspark udf

#python #apache-spark #pyspark #user-defined-functions

Вопрос:

Пытаюсь использовать функцию UDF, но получаю ошибку:

 import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import TimestampType, DecimalType

dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
df = spark.createDataFrame(dict)

timestamp1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp2 = datetime.datetime.fromtimestamp(time.time()   90).strftime('%Y-%m-%d %H:%M:%S')


def calc_time(start, end):
    timefmt = "yyyy-MM-dd'T'HH:mm:ss"
    return unix_timestamp(end, format=timefmt) - unix_timestamp(start, format=timefmt)


calc_time_udf = udf(lambda start, end: calc_time(start, end), TimestampType())

new_df = (df.withColumn('time1', unix_timestamp(lit(timestamp1),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
          .withColumn('time2', unix_timestamp(lit(timestamp2),'yyyy-MM-dd HH:mm:ss').cast("timestamp")))


new_df.withColumn("DIFF", calc_time_udf(col("time1"), col("time2")).cast(DecimalType(20, 6))).show()

 

Трассировка стека ошибок:

Файл «/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/lib/spark/python/pyspark/sql/functions.py», строка 1253, в столбце возврата метки времени unix_timestamp(sc._jvm.функции.метка времени unix_timestamp(_to_java_column(метка времени), формат)) Ошибка атрибута: объект ‘NoneType’ не имеет атрибута ‘_jvm’

     at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$11$anon$1.hasNext(WholeStageCodegenExec.scala:624)
    at org.apache.spark.sql.execution.SparkPlan$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 

Комментарии:

1. Вы смешиваете функции Python и Spark в своем UDF. UDF должен быть чистой функцией python. Можете ли вы объяснить, что вы хотите сделать ?

Ответ №1:

Возможно, вы захотите попробовать этот подход:

 from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf

@pandas_udf(DoubleType())
def ts_diff(start, end):
    return (end - start).dt.total_seconds()

 

затем, используя ответ new_df на ваш вопрос:

 >>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
 --- ----- ------------------- ------------------- ---- 
|age| name|              time1|              time2|DIFF|
 --- ----- ------------------- ------------------- ---- 
|  1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
|  2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
 --- ----- ------------------- ------------------- ----