#apache-spark #pyspark #apache-spark-sql
Вопрос:
Я хочу добавить столбец для следующей метки времени. Я ссылался на различные веб-сайты и создавал код, похожий на них. Однако произошла ошибка. Как это исправить? и почему я получил ошибку?
-------------------- | Timestamp| -------------------- |2014-04-01 12:00:...| |2014-04-01 12:00:...| |2014-04-01 12:00:...| |2014-04-01 12:00:...| |2014-04-01 12:00:...| -------------------- w = Window.partitionBy(("Timestamp")).orderBy(("Timestamp")) df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show(3)
сообщение об ошибке
Py4JJavaError Traceback (most recent call last) /var/folders/3f/cj3qrr9x2dlgwkp147wyd6280000gn/T/ipykernel_20362/1581562162.py in lt;modulegt; 5 from pyspark.sql.functions import * 6 w = Window.partitionBy(("Timestamp")).orderBy(("Timestamp")) ----gt; 7 df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show(3) 8 . . . Py4JJavaError: An error occurred while calling o1334.showString. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange rangepartitioning(Timestamp#541 ASC NULLS FIRST, 200) - *(1) FileScan csv [Session_ID#540,Timestamp#541,Item_ID#542,Category#543] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/iwayamayuto/Desktop/yoochoose-clicks.dat], PartitionFilters: [], PushedFilters: [], ReadSchema: structlt;Session_ID:string,Timestamp:timestamp,Item_ID:string,Category:stringgt;
Ответ №1:
Вы получаете ошибку, потому Window.partitionBy(("Timestamp")).orderBy(("Timestamp"))
что на самом деле это не имеет смысла. Действительно, вы разбиваете на Timestamp
разделы, а затем в каждом разделе упорядочиваете записи по Timestamp
. В каждом разделе все Timestamp
равны, поэтому сортировка по их значению не имеет смысла, а Spark этого не позволяет.
Вычисление lead
(или lag
, если на то пошло) значения по всему кадру данных в spark сложно. Spark-это механизм распределенных вычислений, который выполняет вычисления с учетом разделов. Решением было бы не разделять окно:
w = Window.orderBy(("Timestamp")) df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show()
И все же это не очень хорошая идея. Spark выдаст предупреждение, потому что это подразумевает размещение всех данных в одном разделе. Если это возможно, то вам, вероятно, в первую очередь не нужна искра. В общем, это приводит к ошибке OOM.
Однако существует более сложное решение (пояснения приведены в комментариях).
# let's sort the dataframe by timestamp and add a column containing # the partition index sorted_df = df.orderBy("Timestamp") .select("Timestamp", F.spark_partition_id().alias("partition_index")) .cache() # let's collect the minimal timestamp of each partition. # That timestamp is the lead of the last timestamp of the previous partition last_lead = sorted_df.groupBy("partition_index") .agg(F.min(F.col("Timestamp")).alias("first_ts")) .withColumn("partition_index", F.col("partition_index") - 1) # Let's now create a window partitioned by partition index and ordered # by Timestamp w = Window.partitionBy("partition_index").orderBy("Timestamp") # We compute the lead within each partition of w, and join with last_lead # to compute the lead of the last item of each partition sorted_df.withColumn("lead", F.lead("Timestamp", 1).over(w)) .join(last_lead, "partition_index", "left") .withColumn("lead", F.coalesce("lead", "first_ts")) .drop("first_ts").show()