#python #apache-spark #pyspark #user-defined-functions
Вопрос:
Пытаюсь использовать функцию UDF, но получаю ошибку:
import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import TimestampType, DecimalType
dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
df = spark.createDataFrame(dict)
timestamp1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp2 = datetime.datetime.fromtimestamp(time.time() 90).strftime('%Y-%m-%d %H:%M:%S')
def calc_time(start, end):
timefmt = "yyyy-MM-dd'T'HH:mm:ss"
return unix_timestamp(end, format=timefmt) - unix_timestamp(start, format=timefmt)
calc_time_udf = udf(lambda start, end: calc_time(start, end), TimestampType())
new_df = (df.withColumn('time1', unix_timestamp(lit(timestamp1),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
.withColumn('time2', unix_timestamp(lit(timestamp2),'yyyy-MM-dd HH:mm:ss').cast("timestamp")))
new_df.withColumn("DIFF", calc_time_udf(col("time1"), col("time2")).cast(DecimalType(20, 6))).show()
Трассировка стека ошибок:
Файл «/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/lib/spark/python/pyspark/sql/functions.py», строка 1253, в столбце возврата метки времени unix_timestamp(sc._jvm.функции.метка времени unix_timestamp(_to_java_column(метка времени), формат)) Ошибка атрибута: объект ‘NoneType’ не имеет атрибута ‘_jvm’
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$11$anon$1.hasNext(WholeStageCodegenExec.scala:624)
at org.apache.spark.sql.execution.SparkPlan$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Комментарии:
1. Вы смешиваете функции Python и Spark в своем UDF. UDF должен быть чистой функцией python. Можете ли вы объяснить, что вы хотите сделать ?
Ответ №1:
Возможно, вы захотите попробовать этот подход:
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf
@pandas_udf(DoubleType())
def ts_diff(start, end):
return (end - start).dt.total_seconds()
затем, используя ответ new_df
на ваш вопрос:
>>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
--- ----- ------------------- ------------------- ----
|age| name| time1| time2|DIFF|
--- ----- ------------------- ------------------- ----
| 1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
| 2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
--- ----- ------------------- ------------------- ----