Поиск максимального значения из столбца и заполнение другого столбца на основе максимального значения

#python-3.x #apache-spark #pyspark #apache-spark-sql #pyspark-dataframes

#python-3.x #apache-spark #пайспарк #apache-spark-sql #pyspark-фреймы данных

Вопрос:

У меня есть инкрементная загрузка в CSV-файлах. Я прочитал csv в фрейме данных. Фрейм данных содержит один столбец, содержащий несколько строк. Я должен найти отдельные строки из этого столбца и присвоить ID (целое число) каждому значению, начиная с 0 после присоединения к одному другому фрейму данных.

В следующем запуске я должен назначить идентификатор после определения максимального значения в ID столбце и увеличения его для разных строк. Везде, где в ID столбце есть null, я должен увеличить его ( 1) от значения предыдущего запуска.

ПЕРВЫЙ ЗАПУСК

строка ID
ноль 0
Первый 1
второй 2
третий 3
четвертое 4

ВТОРОЙ ЗАПУСК

MAX(ID) = 4

строка ID
ноль 0
Первый 1
второй 2
третий 3
четвертое 4
пятое 5
шестое 6
седьмой 7
восьмое 8

Я пробовал это, но не смог заставить это работать..

 max = df.agg({"ID": "max"}).collect()[0][0]
df_incremented = df.withcolumn("ID", when(col("ID").isNull(),expr("max  = 1")))
 

Дайте мне знать, если есть простой способ добиться этого.

Ответ №1:

Поскольку вы сохраняете только отдельные значения, вы можете использовать row_number функцию поверх window :

 from pyspark.sql import Window
from pyspark.sql import functions as F

 df = spark.createDataFrame(
    [("a",), ("a",), ("b",), ("c",), ("d",), ("e",), ("e",)],
    ("string",)
)

w = Window.orderBy("string")

df1 = df.distinct().withColumn("ID", F.row_number().over(w) - 1)

df1.show()
# ------ --- 
#|string| ID|
# ------ --- 
#|     a|  0|
#|     b|  1|
#|     c|  2|
#|     d|  3|
#|     e|  4|
# ------ --- 
 

Теперь давайте добавим несколько строк в этот фрейм данных и будем использовать row_number вместе с coalesce , чтобы присваивать ID только для строки, где она равна нулю (не нужно получать максимальное значение):

 df2 = df1.union(spark.sql("select * from values ('f', null), ('h', null), ('i', null)"))

df3 = df2.withColumn("ID", F.coalesce("ID", F.row_number(w) - 1))

df3.show()
# ------ --- 
#|string| ID|
# ------ --- 
#|     a|  0|
#|     b|  1|
#|     c|  2|
#|     d|  3|
#|     e|  4|
#|     f|  5|
#|     h|  6|
#|     i|  7|
# ------ --- 
 

Если вы хотите сохранить дублирующиеся значения и присвоить им одинаковые ID значения , используйте dense_rank вместо row_number .

Комментарии:

1. спасибо @blackbishop. Я думаю, ваша логика выглядит лучше. Но мой тоже работал на меня сейчас. Я попробовал это снова с некоторыми изменениями .. df.withColumn(id_col_name, F.когда(F.col(«ID»). isNull(), F.row_number().over(Window.OrderBy(F.monotonically_increasing_id()))) .в противном случае(df[«ID»]))

2. Привет @hotshot_02, да, coalesce case/when в данном случае это то же самое, что и выражение, я предпочел его, потому что оно короче 😉 Рад, что это помогло вам решить вопрос!