получить массив записей между двумя конкретными записями в pyspark

#python #pyspark

#python #pyspark

Вопрос:

Я сталкиваюсь с проблемой получения массива со всеми записями между 2 выделенными записями. У меня есть таблица, которая выглядит примерно следующим образом:

 | Type | State       | Domain | Time     |
|------|-------------|--------|----------|
| A    | eating      | Fruit  | 12:33:11 |
| A    | working     | day    | 12:35:12 |
| A    | working     | day    | 12:44:55 |
| A    | sleep       | day    | 12:59:53 |
| A    | enjoying    | Fruit  | 08:12:04 |
| A    | thinking    | day    | 09:16:32 |
| A    | eating      | Fruit  | 10:44:31 |
| A    | daydreaming | day    | 10:44:33 |
| A    | calling     | day    | 10:59:01 |
| B    | wondering   | Fruit  | 10:00:01 |
| B    | digesting   | day    | 10:49:09 |
| B    | cleaning    | day    | 12:00:27 |
| B    | eating      | Fruit  | 04:03:22 |
  

и я хочу получить вывод в виде следующего:

 | Type | State       | Domain     | Time     | Intermediate Output             | Array Count | Mode Array                 |
|------|-------------|------------|----------|---------------------------------|-------------|----------------------------|
| A    | eating      | Fruit      | 12:33:11 | ['working', 'working', 'sleep'] | 3           | working                    |
| A    | working     | day        | 12:35:12 | None                            | 0           | None                       |
| A    | working     | day        | 12:44:55 | None                            | 0           | None                       |
| A    | sleep       | day        | 12:59:53 | None                            | 0           | None                       |
| A    | enjoying    | Fruit      | 08:12:04 | ['day']                         | 1           | day                        |
| A    | thinking    | day        | 09:16:32 | None                            | 0           | None                       |
| A    | eating      | Fruit      | 10:44:31 | ['daydreaming', 'calling']      | 2           | ['daydreaming', 'calling'] |
| A    | daydreaming | day        | 10:44:33 | None                            | 0           | None                       |
| A    | calling     | day        | 10:59:01 | None                            | 0           | None                       |
| B    | wondering   | Fruit      | 10:00:01 | ['digesting','cleaning']        | 2           | ['digesting','cleaning']   |
| B    | digesting   | day        | 10:49:09 | None                            | 0           | None                       |
| B    | cleaning    | day        | 12:00:27 | None                            | 0           | None                       |
| B    | eating      | Fruit      | 04:03:22 | []                              | 0           | []                         |
  

В основном выполняется разделение по типу и домену, чтобы получить различия между двумя различными значениями домена. Значения в столбце Домен могут принимать только два значения [Фрукты, день].
Я в основном хочу получить все имена состояний в массиве с первого раза, когда домен является Fruit, до появления следующей записи Fruit. Два других столбца должны быть основаны на промежуточном выходе этого массива, чтобы получить длину массива и его режим.
В пределах одного типа может отображаться произвольное количество записей Fruit.
Весь набор данных упорядочен по времени в соответствии со столбцом времени.

К сожалению, инфраструктура допускает только pyspark, поэтому я не могу использовать pandas.

Я был бы очень признателен за любую помощь и советы, так как я новичок в pyspark! Заранее большое вам спасибо!

Ответ №1:

Я использую две оконные функции для решения этой проблемы. (Также предполагается, что ваши записи будут упорядочены по столбцу «Время»)

Поскольку ваш ‘Domain’ может принимать только два значения, я кодирую ‘Fruit’ как 1, а ‘day’ как 0. Мы сделаем incremental sum в этом новом столбце домена, чтобы использовать его в качестве ключа для группы «Состояние».

Удалите первый элемент и сохраните остальные из выходных данных collect_list функции. Для этого я использую remove_first_element UDF .

Вам не нужен ‘array_output’ всякий раз, когда ‘Домен’ равен ‘day’. Поэтому замените его на None всякий раз, когда «Домен» равен «day».

F.size() чтобы получить размер массива и пользовательский UDF get_multi_mode_list_udf для получения значений нескольких режимов

Построение вашего фрейма данных:

 from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("Type", StringType()), StructField("State", StringType()),
         StructField("Domain", StringType()), StructField("Time", IntegerType())])

data = [['A', 'eating', 'Fruit', 1], ['A', 'working', 'day', 2], ['A', 'working', 'day', 3], ['A', 'sleep', 'day', 4], ['A', 'enjoying', 'Fruit', 5], ['A', 'thinking', 'day', 6], ['A', 'eating', 'Fruit', 7], ['A', 'daydreaming', 'day', 8], ['A', 'calling', 'day', 9], ['B', 'wondering', 'Fruit', 10], ['B', 'digesting', 'day', 11], ['B', 'cleaning', 'day', 12], ['B', 'eating', 'Fruit', 13]]

df = spark.createDataFrame(data,schema=schema)

df.show()
  

Фактическая операция:

 df1 = df.withColumn("Domain_num", F.when(col("Domain")=="Fruit", 1).otherwise(0))

w1=Window().partitionBy("Type").orderBy("Time")
w2=Window().partitionBy("Type", "incremental_sum")

def remove_first_element(list):
    return list[1:]

remove_first_element_udf = F.udf(remove_first_element, ArrayType(StringType()))

df1 = df1.withColumn("incremental_sum", F.sum("Domain_num").over(w1))
        .withColumn("array_output", collect_list(col("State")).over(w2))
        .withColumn("array_output", remove_first_element_udf(col("array_output")))
        .withColumn("array_output", F.when(col("Domain_num")==0, None).otherwise(col("array_output")))
        .withColumn("array_count", F.size(col("array_output")))
        .withColumn("array_count", F.when(col("Domain_num")==0, 0).otherwise(col("array_count")))

  

Поиск режима:

 from collections import Counter
def get_multi_mode_list(input_array):
    multi_mode = []
    counter_var = Counter(input_array)  
    try:
        temp = counter_var.most_common(1)[0][1]
    except:
        temp = counter_var.most_common(1)
    for i in counter_var: 
        if input_array.count(i) == temp: 
            multi_mode.append(i)
    return(list(set(multi_mode)))

get_multi_mode_list_udf = F.udf(get_multi_mode_list, ArrayType(StringType()))

df1 = df1.withColumn("multi_mode", get_multi_mode_list_udf(col("array_output")))
        .withColumn("multi_mode", F.when(col("Domain_num")==0, None).otherwise(col("multi_mode")))
        .drop("Domain_num", "incremental_sum")
  

Вывод:

 df1.orderBy("Time").show(truncate=False)

 ---- ----------- ------ ---- ------------------------- ----------- ---------------------- 
|Type|State      |Domain|Time|array_output             |array_count|multi_mode            |
 ---- ----------- ------ ---- ------------------------- ----------- ---------------------- 
|A   |eating     |Fruit |1   |[working, working, sleep]|3          |[working]             |
|A   |working    |day   |2   |null                     |0          |null                  |
|A   |working    |day   |3   |null                     |0          |null                  |
|A   |sleep      |day   |4   |null                     |0          |null                  |
|A   |enjoying   |Fruit |5   |[thinking]               |1          |[thinking]            |
|A   |thinking   |day   |6   |null                     |0          |null                  |
|A   |eating     |Fruit |7   |[daydreaming, calling]   |2          |[daydreaming, calling]|
|A   |daydreaming|day   |8   |null                     |0          |null                  |
|A   |calling    |day   |9   |null                     |0          |null                  |
|B   |wondering  |Fruit |10  |[digesting, cleaning]    |2          |[digesting, cleaning] |
|B   |digesting  |day   |11  |null                     |0          |null                  |
|B   |cleaning   |day   |12  |null                     |0          |null                  |
|B   |eating     |Fruit |13  |[]                       |0          |[]                    |
 ---- ----------- ------ ---- ------------------------- ----------- ---------------------- 

  

Комментарии:

1. Это потрясающе! Спасибо вам большое, очень большое! Это мне очень помогло.

2. Рад помочь! Я добавил фрагмент кода, чтобы узнать режим.