#python-3.x #apache-spark #pyspark #apache-spark-sql #pyspark-dataframes
#python-3.x #apache-spark #пайспарк #apache-spark-sql #pyspark-фреймы данных
Вопрос:
У меня есть инкрементная загрузка в CSV-файлах. Я прочитал csv в фрейме данных. Фрейм данных содержит один столбец, содержащий несколько строк. Я должен найти отдельные строки из этого столбца и присвоить ID
(целое число) каждому значению, начиная с 0
после присоединения к одному другому фрейму данных.
В следующем запуске я должен назначить идентификатор после определения максимального значения в ID
столбце и увеличения его для разных строк. Везде, где в ID
столбце есть null, я должен увеличить его ( 1) от значения предыдущего запуска.
ПЕРВЫЙ ЗАПУСК
строка | ID |
---|---|
ноль | 0 |
Первый | 1 |
второй | 2 |
третий | 3 |
четвертое | 4 |
ВТОРОЙ ЗАПУСК
MAX(ID) = 4
строка | ID |
---|---|
ноль | 0 |
Первый | 1 |
второй | 2 |
третий | 3 |
четвертое | 4 |
пятое | 5 |
шестое | 6 |
седьмой | 7 |
восьмое | 8 |
Я пробовал это, но не смог заставить это работать..
max = df.agg({"ID": "max"}).collect()[0][0]
df_incremented = df.withcolumn("ID", when(col("ID").isNull(),expr("max = 1")))
Дайте мне знать, если есть простой способ добиться этого.
Ответ №1:
Поскольку вы сохраняете только отдельные значения, вы можете использовать row_number
функцию поверх window :
from pyspark.sql import Window
from pyspark.sql import functions as F
df = spark.createDataFrame(
[("a",), ("a",), ("b",), ("c",), ("d",), ("e",), ("e",)],
("string",)
)
w = Window.orderBy("string")
df1 = df.distinct().withColumn("ID", F.row_number().over(w) - 1)
df1.show()
# ------ ---
#|string| ID|
# ------ ---
#| a| 0|
#| b| 1|
#| c| 2|
#| d| 3|
#| e| 4|
# ------ ---
Теперь давайте добавим несколько строк в этот фрейм данных и будем использовать row_number
вместе с coalesce
, чтобы присваивать ID
только для строки, где она равна нулю (не нужно получать максимальное значение):
df2 = df1.union(spark.sql("select * from values ('f', null), ('h', null), ('i', null)"))
df3 = df2.withColumn("ID", F.coalesce("ID", F.row_number(w) - 1))
df3.show()
# ------ ---
#|string| ID|
# ------ ---
#| a| 0|
#| b| 1|
#| c| 2|
#| d| 3|
#| e| 4|
#| f| 5|
#| h| 6|
#| i| 7|
# ------ ---
Если вы хотите сохранить дублирующиеся значения и присвоить им одинаковые ID
значения , используйте dense_rank
вместо row_number
.
Комментарии:
1. спасибо @blackbishop. Я думаю, ваша логика выглядит лучше. Но мой тоже работал на меня сейчас. Я попробовал это снова с некоторыми изменениями .. df.withColumn(id_col_name, F.когда(F.col(«ID»). isNull(), F.row_number().over(Window.OrderBy(F.monotonically_increasing_id()))) .в противном случае(df[«ID»]))
2. Привет @hotshot_02, да,
coalesce
case/when
в данном случае это то же самое, что и выражение, я предпочел его, потому что оно короче 😉 Рад, что это помогло вам решить вопрос!