pyspark dataframe не изменяется после некоторой обработки

#python #dataframe #apache-spark #pyspark #apache-spark-sql

#python #фрейм данных #apache-spark #pyspark #apache-spark-sql

Вопрос:

Я создаю dateframe и использую функцию window для получения накопительного значения, но после использования функции df и df.select() отображаются с другим порядком строк

 spark = SparkSession.builder.master('local[8]').appName("SparkByExample.com").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", '50')

# create dataframe
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,FloatType,LongType,ArrayType
data = [(1,1,2),
        (1,2,2),
        (1,3,2),   
        (1,2,1),
        (2,1,2),
        (2,3,2),
        (2,2,1),
        (3,1,2),
        (3,3,2),
        (3,2,1)]

schema = StructType([ 
    StructField("col1", IntegerType(),True), 
    StructField("col2", IntegerType(),True), 
    StructField("col3", IntegerType(),True)])
 
df = spark.createDataFrame(data=data,schema=schema)
df.show()
 
  ---- ---- ---- 
|col1|col2|col3|
 ---- ---- ---- 
|   1|   1|   2|
|   1|   2|   2|
|   1|   3|   2|
|   1|   2|   1|
|   2|   1|   2|
|   2|   3|   2|
|   2|   2|   1|
|   3|   1|   2|
|   3|   3|   2|
|   3|   2|   1|
 ---- ---- ---- 
 
 # this is window fuction to get the accumulative value
from pyspark.sql.window import Window
w = Window().partitionBy(F.col('col1')).orderBy(F.col('col2'))

def f(lag_val, current_val):
    value = 0
    if lag_val != current_val:
        value = 1
    return value
    
# register udf so we can use with our dataframe
func_udf = F.udf(f, IntegerType())

print(id(df))
df = df.withColumn("new_column", func_udf(F.lag("col3").over(w), df['col3'])).withColumn('new_column2', F.sum('new_column').over(w.partitionBy(F.col('col1')).rowsBetween(Window.unboundedPreceding, 0)))
df.show()
 
  ---- ---- ---- ---------- ----------- 
|col1|col2|col3|new_column|new_column2|
 ---- ---- ---- ---------- ----------- 
|   3|   1|   2|         1|          1|
|   3|   2|   1|         1|          2|
|   3|   3|   2|         1|          3|
|   2|   1|   2|         1|          1|
|   2|   2|   1|         1|          2|
|   2|   3|   2|         1|          3|
|   1|   1|   2|         1|          1|
|   1|   2|   2|         0|          1|
|   1|   2|   1|         1|          2|
|   1|   3|   2|         1|          3|
 ---- ---- ---- ---------- ----------- 
 
 test = df.select('col1','col2','col3')
test.show()
 ---- ---- ---- 
|col1|col2|col3|
 ---- ---- ---- 
|   1|   1|   2|
|   1|   2|   2|
|   1|   3|   2|
|   1|   2|   1|
|   2|   1|   2|
|   2|   3|   2|
|   2|   2|   1|
|   3|   1|   2|
|   3|   3|   2|
|   3|   2|   1|
 ---- ---- ---- 
 

мы можем видеть, что порядок строк ‘col1’, ‘col2’, ‘col3’ df и test отличаются, но test показывает исходный порядок df, это может быть из-за действия и преобразования, но я не уверен.

Ответ №1:

Поскольку в пакетах фреймов данных python иногда порядок автоматически изменяется после выполнения некоторых функций. Если вы имеете в виду отладку, я предлагаю вам разбить свой код следующим образом:

 df = df.withColumn("new_column", func_udf(F.lag("col3").over(w), df['col3'])).withColumn('new_column2', F.sum('new_column').over(w.partitionBy(F.col('col1')).rowsBetween(Window.unboundedPreceding, 0)))
 

к чему-то вроде этого

 df = df.withColumn("new_column", func_udf(F.lag("col3").over(w), df['col3']))
df_partition = w.partitionBy(F.col('col1')).rowsBetween(Window.unboundedPreceding, 0))
df = df.withColumn('new_column2', F.sum('new_column').over(df_partition)
 

Вам следует дополнительно разбить эти строки на меньшие функции, пока вы не поймете, что делает каждая функция.

Комментарии:

1. Спасибо, я знаю, что у pandas dataframe не будет такой проблемы, это происходит в pyspark dataframe

Ответ №2:

Если вы делаете explain() , чтобы проверить планы запросов,

 test.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[col1#255,col2#256,col3#257]

df.explain()
== Physical Plan ==
Window [sum(cast(new_column#288 as bigint)) windowspecdefinition(col1#255, col2#256 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_column2#295L], [col1#255], [col2#256 ASC NULLS FIRST]
 - *(4) Sort [col1#255 ASC NULLS FIRST, col2#256 ASC NULLS FIRST], false, 0
    - Exchange hashpartitioning(col1#255, 50), true, [id=#553]
       - *(3) Project [col1#255, col2#256, col3#257, pythonUDF0#389 AS new_column#288]
          - BatchEvalPython [f(_we0#289, col3#257)], [pythonUDF0#389]
             - Window [lag(col3#257, 1, null) windowspecdefinition(col1#255, col2#256 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#289], [col1#255], [col2#256 ASC NULLS FIRST]
                - *(2) Sort [col1#255 ASC NULLS FIRST, col2#256 ASC NULLS FIRST], false, 0
                   - Exchange hashpartitioning(col1#255, 50), true, [id=#544]
                      - *(1) Scan ExistingRDD[col1#255,col2#256,col3#257]
 

Вы можете видеть, что оптимизатор запросов знал, что test для фрейма данных требуются только столбцы из исходного фрейма данных, поэтому он пропустил все преобразования для новых столбцов, df поскольку они не имеют значения. Вот почему dataframe имел тот же порядок, что и исходный dataframe.

Но для нового df с дополнительными столбцами сортировка была выполнена в оконных функциях, поэтому порядок фрейма данных изменился.

Комментарии:

1. Большое спасибо, но в таком случае, как я могу узнать, какой df будет вызываться для другой работы по программированию?