#pyspark #lag #row-number
Вопрос:
Я хотел бы создать номер строки, разделенный по УЧЕТНОЙ записи, ИМЕНИ и ТИПУ.
Я попробовал плотный ряд и номер строки. Однако мне нужны все исходные записи, содержащие изменения в любом из этих столбцов
df = spark.createDataFrame( [ ('20190910', 'A1', 'Linda', 'b2c'), ('20190911', 'A1', 'Tom', 'consultant'), ('20190912', 'A1', 'John', 'b2c'), ('20190913', 'A1', 'Tom', 'consultant'), ('20190914', 'A1', 'Tom', 'consultant'), ('20190915', 'A1', 'Linda', 'consultant'), ('20190916', 'A1', 'Linda', 'b2c'), ('20190917', 'B1', 'John', 'b2c'), ('20190916', 'B1', 'John', 'consultant'), ('20190910', 'B1', 'Linda', 'b2c'), ('20190911', 'B1', 'John', 'b2c'), ('20190915', 'C1', 'John', 'consultant'), ('20190916', 'C1', 'Linda', 'consultant'), ('20190917', 'C1', 'John', 'b2c'), ('20190916', 'C1', 'RJohn', 'consultant'), ('20190910', 'C1', 'Tom', 'b2c'), ('20190911', 'C1', 'John', 'b2c'), ], ['Event_date', 'account', 'name', 'type'] )
Ожидаемые результаты:
Дата события | Учетная запись | Имя | Тип | номер строки |
---|---|---|---|---|
20190910 | A1 | Линда | b2c | 1 |
20190911 | A1 | Том | консультант | 1 |
20190912 | A1 | Джон | b2c | 1 |
20190913 | A1 | Том | консультант | 2 |
20190914 | A1 | Том | консультант | 3 |
20190915 | A1 | Линда | консультант | 1 |
20190916 | A1 | Линда | b2c | 2 |
20190917 | В1 | Джон | b2c | 1 |
20190916 | В1 | Джон | консультант | 1 |
20190910 | В1 | Линда | b2c | 2 |
20190911 | В1 | Джон | b2c | 3 |
20190915 | C1 | Джон | консультант | 1 |
20190916 | C1 | Линда | консультант | 1 |
20190917 | C1 | Джон | b2c | 1 |
20190916 | C1 | Джон | консультант | 2 |
20190910 | C1 | Том | b2c | 1 |
20190911 | C1 | Джон | b2c | 2 |
Комментарии:
1. Я предполагаю, что после разделения вы хотите назначить номера строк в том же порядке, в каком они отображаются в исходном кадре данных, однако это невозможно из-за распределенной природы spark, мы не можем предполагать неявный порядок. Я бы предложил включить номер строки в исходный кадр данных и порядок, основанный на этом, и сгенерировать номер строки после разделения.
Ответ №1:
Вы можете создать a Window
и разделить его на account
, name
, type
а затем row_number
поверх него.
Пример:
spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame( [ ("20190910", "A1", "Linda", "b2c"), ("20190911", "A1", "Tom", "consultant"), ("20190912", "A1", "John", "b2c"), ("20190913", "A1", "Tom", "consultant"), ("20190914", "A1", "Tom", "consultant"), ("20190915", "A1", "Linda", "consultant"), ("20190916", "A1", "Linda", "b2c"), ("20190917", "B1", "John", "b2c"), ("20190916", "B1", "John", "consultant"), ("20190910", "B1", "Linda", "b2c"), ("20190911", "B1", "John", "b2c"), ("20190915", "C1", "John", "consultant"), ("20190916", "C1", "Linda", "consultant"), ("20190917", "C1", "John", "b2c"), ("20190916", "C1", "RJohn", "consultant"), ("20190910", "C1", "Tom", "b2c"), ("20190911", "C1", "John", "b2c"), ], ["Event_date", "account", "name", "type"], ) w = Window.partitionBy("account", "name", "type").orderBy("Event_date") df = df.withColumn("row_number", F.row_number().over(w)).orderBy("Event_date")
Результат:
---------- ------- ----- ---------- ---------- |Event_date|account|name |type |row_number| ---------- ------- ----- ---------- ---------- |20190912 |A1 |John |b2c |1 | |20190911 |A1 |Tom |consultant|1 | |20190913 |A1 |Tom |consultant|2 | |20190914 |A1 |Tom |consultant|3 | |20190915 |A1 |Linda|consultant|1 | |20190910 |A1 |Linda|b2c |1 | |20190916 |A1 |Linda|b2c |2 | |20190911 |B1 |John |b2c |1 | |20190916 |B1 |John |consultant|1 | |20190917 |B1 |John |b2c |2 | |20190910 |B1 |Linda|b2c |1 | |20190910 |C1 |Tom |b2c |1 | |20190915 |C1 |John |consultant|1 | |20190916 |C1 |RJohn|consultant|1 | |20190911 |C1 |John |b2c |1 | |20190916 |C1 |Linda|consultant|1 | |20190917 |C1 |John |b2c |2 | ---------- ------- ----- ---------- ----------
It’s not exactly the same as your expected outcome, since it’s ordered by Event_date
and account
.
Your expected output doesn’t seem to be consistent. Please check the numbers again, especially for
B1
. AlsoRJohn
in the input data.
Ответ №2:
You can do something that partition by Account, Name and Type. Then you can order by account followed by event_date.
from pyspark.sql.functions import * from pyspark.sql.window import Window df = spark.createDataFrame( [ ("20190910", "A1", "Linda", "b2c"), ("20190911", "A1", "Tom", "consultant"), ("20190912", "A1", "John", "b2c"), ("20190913", "A1", "Tom", "consultant"), ("20190914", "A1", "Tom", "consultant"), ("20190915", "A1", "Linda", "consultant"), ("20190916", "A1", "Linda", "b2c"), ("20190917", "B1", "John", "b2c"), ("20190916", "B1", "John", "consultant"), ("20190910", "B1", "Linda", "b2c"), ("20190911", "B1", "John", "b2c"), ("20190915", "C1", "John", "consultant"), ("20190916", "C1", "Linda", "consultant"), ("20190917", "C1", "John", "b2c"), ("20190916", "C1", "John", "consultant"), ("20190910", "C1", "Tom", "b2c"), ("20190911", "C1", "John", "b2c"), ], ["Event_date", "account", "name", "type"], ) w = Window.partitionBy("account", "name", "type").orderBy("Event_date") df = df.withColumn("row_number", row_number().over(w)).orderBy("account","Event_date")