Как я могу создать новые строки в существующем фрейме данных? в Писпарке или Скале

#python #scala #apache-spark #pyspark

Вопрос:

Например, теперь у меня есть этот фрейм данных.

  -------- ------ 
|      id|number|
 -------- ------ 
|19891201|     1|
|19891201|     4|
 -------- ------ 
 

Но я хочу, чтобы этот фрейм данных был таким.

 
 -------- ------ 
|      id|number|
 -------- ------ 
|19891201|     1|
|19891201|     2|
|19891201|     3|
|19891201|     4|
 -------- ------ 
 

Я хочу создать новые строки, в которых числа варьируются от значений min() и max() из столбца «число».

В этом примере я хочу иметь строки, значения которых в столбце «число» равны 2 и 3.

Комментарии:

1. версия spark ?

Ответ №1:

Используйте sequence(start, stop, step) функцию из 2.4 версии spark.

 scala> df
 .groupBy($"id")
 .agg(
         min($"number").as("start"),
        max($"number").as("end")
    )
 .selectExpr(
        "id",
        "explode_outer(sequence(start,end,1)) as number"
    )
 .show(false)

 

Выход

  -------- ------ 
|id      |number|
 -------- ------ 
|19891201|1     |
|19891201|2     |
|19891201|3     |
|19891201|4     |
 -------- ------ 
 

Ответ №2:

Попробуйте этот код

 from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import ArrayType, FloatType, StringType, IntegerType

from pyspark.sql.functions import min, max , udf, explode

schema = StructType([StructField("id", IntegerType(), True),StructField("number", IntegerType(), True)])
my_list = [(19891201, 1), (19891201,4)]
rdd = sc.parallelize(my_list)
df = sqlContext.createDataFrame(rdd, schema)
df.show()
df2 = df.groupby("id").agg(min("number").alias("min"),max("number").alias("max"))

def my_udf(min, max):
    return list(range(min,max 1))

label_udf = udf(my_udf, ArrayType(IntegerType()))

df3 = df2.withColumn("l", label_udf(df2.min, df2.max)

df4 = df3.withColumn("ll", explode("l"))
df5 = df4.select("id", "ll")
df5.show()