#python #pyspark
#python #pyspark
Вопрос:
Я сталкиваюсь с проблемой получения массива со всеми записями между 2 выделенными записями. У меня есть таблица, которая выглядит примерно следующим образом:
| Type | State | Domain | Time |
|------|-------------|--------|----------|
| A | eating | Fruit | 12:33:11 |
| A | working | day | 12:35:12 |
| A | working | day | 12:44:55 |
| A | sleep | day | 12:59:53 |
| A | enjoying | Fruit | 08:12:04 |
| A | thinking | day | 09:16:32 |
| A | eating | Fruit | 10:44:31 |
| A | daydreaming | day | 10:44:33 |
| A | calling | day | 10:59:01 |
| B | wondering | Fruit | 10:00:01 |
| B | digesting | day | 10:49:09 |
| B | cleaning | day | 12:00:27 |
| B | eating | Fruit | 04:03:22 |
и я хочу получить вывод в виде следующего:
| Type | State | Domain | Time | Intermediate Output | Array Count | Mode Array |
|------|-------------|------------|----------|---------------------------------|-------------|----------------------------|
| A | eating | Fruit | 12:33:11 | ['working', 'working', 'sleep'] | 3 | working |
| A | working | day | 12:35:12 | None | 0 | None |
| A | working | day | 12:44:55 | None | 0 | None |
| A | sleep | day | 12:59:53 | None | 0 | None |
| A | enjoying | Fruit | 08:12:04 | ['day'] | 1 | day |
| A | thinking | day | 09:16:32 | None | 0 | None |
| A | eating | Fruit | 10:44:31 | ['daydreaming', 'calling'] | 2 | ['daydreaming', 'calling'] |
| A | daydreaming | day | 10:44:33 | None | 0 | None |
| A | calling | day | 10:59:01 | None | 0 | None |
| B | wondering | Fruit | 10:00:01 | ['digesting','cleaning'] | 2 | ['digesting','cleaning'] |
| B | digesting | day | 10:49:09 | None | 0 | None |
| B | cleaning | day | 12:00:27 | None | 0 | None |
| B | eating | Fruit | 04:03:22 | [] | 0 | [] |
В основном выполняется разделение по типу и домену, чтобы получить различия между двумя различными значениями домена. Значения в столбце Домен могут принимать только два значения [Фрукты, день].
Я в основном хочу получить все имена состояний в массиве с первого раза, когда домен является Fruit, до появления следующей записи Fruit. Два других столбца должны быть основаны на промежуточном выходе этого массива, чтобы получить длину массива и его режим.
В пределах одного типа может отображаться произвольное количество записей Fruit.
Весь набор данных упорядочен по времени в соответствии со столбцом времени.
К сожалению, инфраструктура допускает только pyspark, поэтому я не могу использовать pandas.
Я был бы очень признателен за любую помощь и советы, так как я новичок в pyspark! Заранее большое вам спасибо!
Ответ №1:
Я использую две оконные функции для решения этой проблемы. (Также предполагается, что ваши записи будут упорядочены по столбцу «Время»)
Поскольку ваш ‘Domain’ может принимать только два значения, я кодирую ‘Fruit’ как 1, а ‘day’ как 0. Мы сделаем incremental sum
в этом новом столбце домена, чтобы использовать его в качестве ключа для группы «Состояние».
Удалите первый элемент и сохраните остальные из выходных данных collect_list
функции. Для этого я использую remove_first_element UDF
.
Вам не нужен ‘array_output’ всякий раз, когда ‘Домен’ равен ‘day’. Поэтому замените его на None
всякий раз, когда «Домен» равен «day».
F.size()
чтобы получить размер массива и пользовательский UDF get_multi_mode_list_udf
для получения значений нескольких режимов
Построение вашего фрейма данных:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("Type", StringType()), StructField("State", StringType()),
StructField("Domain", StringType()), StructField("Time", IntegerType())])
data = [['A', 'eating', 'Fruit', 1], ['A', 'working', 'day', 2], ['A', 'working', 'day', 3], ['A', 'sleep', 'day', 4], ['A', 'enjoying', 'Fruit', 5], ['A', 'thinking', 'day', 6], ['A', 'eating', 'Fruit', 7], ['A', 'daydreaming', 'day', 8], ['A', 'calling', 'day', 9], ['B', 'wondering', 'Fruit', 10], ['B', 'digesting', 'day', 11], ['B', 'cleaning', 'day', 12], ['B', 'eating', 'Fruit', 13]]
df = spark.createDataFrame(data,schema=schema)
df.show()
Фактическая операция:
df1 = df.withColumn("Domain_num", F.when(col("Domain")=="Fruit", 1).otherwise(0))
w1=Window().partitionBy("Type").orderBy("Time")
w2=Window().partitionBy("Type", "incremental_sum")
def remove_first_element(list):
return list[1:]
remove_first_element_udf = F.udf(remove_first_element, ArrayType(StringType()))
df1 = df1.withColumn("incremental_sum", F.sum("Domain_num").over(w1))
.withColumn("array_output", collect_list(col("State")).over(w2))
.withColumn("array_output", remove_first_element_udf(col("array_output")))
.withColumn("array_output", F.when(col("Domain_num")==0, None).otherwise(col("array_output")))
.withColumn("array_count", F.size(col("array_output")))
.withColumn("array_count", F.when(col("Domain_num")==0, 0).otherwise(col("array_count")))
Поиск режима:
from collections import Counter
def get_multi_mode_list(input_array):
multi_mode = []
counter_var = Counter(input_array)
try:
temp = counter_var.most_common(1)[0][1]
except:
temp = counter_var.most_common(1)
for i in counter_var:
if input_array.count(i) == temp:
multi_mode.append(i)
return(list(set(multi_mode)))
get_multi_mode_list_udf = F.udf(get_multi_mode_list, ArrayType(StringType()))
df1 = df1.withColumn("multi_mode", get_multi_mode_list_udf(col("array_output")))
.withColumn("multi_mode", F.when(col("Domain_num")==0, None).otherwise(col("multi_mode")))
.drop("Domain_num", "incremental_sum")
Вывод:
df1.orderBy("Time").show(truncate=False)
---- ----------- ------ ---- ------------------------- ----------- ----------------------
|Type|State |Domain|Time|array_output |array_count|multi_mode |
---- ----------- ------ ---- ------------------------- ----------- ----------------------
|A |eating |Fruit |1 |[working, working, sleep]|3 |[working] |
|A |working |day |2 |null |0 |null |
|A |working |day |3 |null |0 |null |
|A |sleep |day |4 |null |0 |null |
|A |enjoying |Fruit |5 |[thinking] |1 |[thinking] |
|A |thinking |day |6 |null |0 |null |
|A |eating |Fruit |7 |[daydreaming, calling] |2 |[daydreaming, calling]|
|A |daydreaming|day |8 |null |0 |null |
|A |calling |day |9 |null |0 |null |
|B |wondering |Fruit |10 |[digesting, cleaning] |2 |[digesting, cleaning] |
|B |digesting |day |11 |null |0 |null |
|B |cleaning |day |12 |null |0 |null |
|B |eating |Fruit |13 |[] |0 |[] |
---- ----------- ------ ---- ------------------------- ----------- ----------------------
Комментарии:
1. Это потрясающе! Спасибо вам большое, очень большое! Это мне очень помогло.
2. Рад помочь! Я добавил фрагмент кода, чтобы узнать режим.