Как реализовать лидерство в фрейме данных Spark

#apache-spark #pyspark #apache-spark-sql

Вопрос:

Я хочу добавить столбец для следующей метки времени. Я ссылался на различные веб-сайты и создавал код, похожий на них. Однако произошла ошибка. Как это исправить? и почему я получил ошибку?

  --------------------  | Timestamp|  --------------------  |2014-04-01 12:00:...| |2014-04-01 12:00:...| |2014-04-01 12:00:...| |2014-04-01 12:00:...| |2014-04-01 12:00:...|  --------------------    w = Window.partitionBy(("Timestamp")).orderBy(("Timestamp")) df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show(3)  

сообщение об ошибке

 Py4JJavaError Traceback (most recent call last) /var/folders/3f/cj3qrr9x2dlgwkp147wyd6280000gn/T/ipykernel_20362/1581562162.py in lt;modulegt;  5 from pyspark.sql.functions import *  6 w = Window.partitionBy(("Timestamp")).orderBy(("Timestamp")) ----gt; 7 df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show(3)  8  . . .  Py4JJavaError: An error occurred while calling o1334.showString. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange rangepartitioning(Timestamp#541 ASC NULLS FIRST, 200)  - *(1) FileScan csv [Session_ID#540,Timestamp#541,Item_ID#542,Category#543] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/iwayamayuto/Desktop/yoochoose-clicks.dat], PartitionFilters: [], PushedFilters: [], ReadSchema: structlt;Session_ID:string,Timestamp:timestamp,Item_ID:string,Category:stringgt;  

Ответ №1:

Вы получаете ошибку, потому Window.partitionBy(("Timestamp")).orderBy(("Timestamp")) что на самом деле это не имеет смысла. Действительно, вы разбиваете на Timestamp разделы, а затем в каждом разделе упорядочиваете записи по Timestamp . В каждом разделе все Timestamp равны, поэтому сортировка по их значению не имеет смысла, а Spark этого не позволяет.

Вычисление lead (или lag , если на то пошло) значения по всему кадру данных в spark сложно. Spark-это механизм распределенных вычислений, который выполняет вычисления с учетом разделов. Решением было бы не разделять окно:

 w = Window.orderBy(("Timestamp")) df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show()  

И все же это не очень хорошая идея. Spark выдаст предупреждение, потому что это подразумевает размещение всех данных в одном разделе. Если это возможно, то вам, вероятно, в первую очередь не нужна искра. В общем, это приводит к ошибке OOM.

Однако существует более сложное решение (пояснения приведены в комментариях).

 # let's sort the dataframe by timestamp and add a column containing # the partition index sorted_df = df.orderBy("Timestamp")  .select("Timestamp", F.spark_partition_id().alias("partition_index"))  .cache()  # let's collect the minimal timestamp of each partition. # That timestamp is the lead of the last timestamp of the previous partition last_lead = sorted_df.groupBy("partition_index")  .agg(F.min(F.col("Timestamp")).alias("first_ts"))  .withColumn("partition_index", F.col("partition_index") - 1)  # Let's now create a window partitioned by partition index and ordered # by Timestamp w = Window.partitionBy("partition_index").orderBy("Timestamp")  # We compute the lead within each partition of w, and join with last_lead # to compute the lead of the last item of each partition sorted_df.withColumn("lead", F.lead("Timestamp", 1).over(w))  .join(last_lead, "partition_index", "left")  .withColumn("lead", F.coalesce("lead", "first_ts"))  .drop("first_ts").show()