Как разделить фрейм данных на два фрейма данных на основе общего количества строк в исходном фрейме данных

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

Здравствуйте, я новичок в spark и scala, и я хотел бы разделить следующий фрейм данных:

 df:
 ---------- ----- ------ ---------- -------- 
|        Ts| Temp|  Wind|  Precipit|Humidity|
 ---------- ----- ------ ---------- -------- 
|1579647600|   10|    22|        10|      50|
|1579734000|   11|    21|        10|      55|
|1579820400|   10|    18|        15|      60|
|1579906800|    9|    23|        20|      60|
|1579993200|    8|    24|        25|      50|
|1580079600|   10|    18|        27|      60|
|1580166000|   11|    20|        30|      50|
|1580252400|   12|    17|        15|      50|
|1580338800|   10|    14|        21|      50|
|1580425200|    9|    16|        25|      60|
----------- ----- ------ ---------- -------- 
  

Результирующие фреймы данных должны быть следующими:

 df1:
 ---------- ----- ------ ---------- -------- 
|        Ts| Temp|  Wind|  Precipit|Humidity|
 ---------- ----- ------ ---------- -------- 
|1579647600|   10|    22|        10|      50|
|1579734000|   11|    21|        10|      55|
|1579820400|   10|    18|        15|      60|
|1579906800|    9|    23|        20|      60|
|1579993200|    8|    24|        25|      50|
|1580079600|   10|    18|        27|      60|
|1580166000|   11|    20|        30|      50|
|1580252400|   12|    17|        15|      50|
 ---------- ----- ------ ---------- -------- 
df2:
 ---------- ----- ------ ---------- -------- 
|        Ts| Temp|  Wind|  Precipit|Humidity|
 ---------- ----- ------ ---------- -------- 
|1580338800|   10|    14|        21|      50|
|1580425200|    9|    16|        25|      60|
----------- ----- ------ ---------- -------- 
  

где df1 имеет 80% верхних строк df, а df2 — оставшиеся 20%.

Ответ №1:

Попробуйте использовать monotonically_increasing_id() функцию с window percent_rank() , так как эта функция сохраняет порядок.

Example:

 val df=sc.parallelize(Seq((1579647600,10,22,10,50),
(1579734000,11,21,10,55),
(1579820400,10,18,15,60),
(1579906800, 9,23,20,60),
(1579993200, 8,24,25,50),
(1580079600,10,18,27,60),
(1580166000,11,20,30,50),
(1580252400,12,17,15,50),
(1580338800,10,14,21,50),
(1580425200, 9,16,25,60)),10).toDF("Ts","Temp","Wind","Precipit","Humidity")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df1=df.withColumn("mid",monotonically_increasing_id)
val df_above_80=df1.withColumn("pr",percent_rank().over(w)).filter(col("pr") >= 0.8).drop(Seq("mid","pr"):_*)
val df_below_80=df1.withColumn("pr",percent_rank().over(w)).filter(col("pr") < 0.8).drop(Seq("mid","pr"):_*)

df_below_80.show()
/*
 ---------- ---- ---- -------- -------- 
|        Ts|Temp|Wind|Precipit|Humidity|
 ---------- ---- ---- -------- -------- 
|1579647600|  10|  22|      10|      50|
|1579734000|  11|  21|      10|      55|
|1579820400|  10|  18|      15|      60|
|1579906800|   9|  23|      20|      60|
|1579993200|   8|  24|      25|      50|
|1580079600|  10|  18|      27|      60|
|1580166000|  11|  20|      30|      50|
|1580252400|  12|  17|      15|      50|
 ---------- ---- ---- -------- -------- 
*/

df_above_80.show()
/*
 ---------- ---- ---- -------- -------- 
|        Ts|Temp|Wind|Precipit|Humidity|
 ---------- ---- ---- -------- -------- 
|1580338800|  10|  14|      21|      50|
|1580425200|   9|  16|      25|      60|
 ---------- ---- ---- -------- -------- 
*/
  

Ответ №2:

Предполагая, что данные разделяются случайным образом:

 val Array(df1, df2) = df.randomSplit(Array(0.8, 0.2))
  

Однако, если под «верхними строками» вы подразумеваете столбец ‘Ts’ в вашем примере фрейма данных, то вы могли бы сделать это:

 import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,percent_rank}

val window = Window.partitionBy().orderBy(df['Ts'].desc())

val df1 = df.select('*', percent_rank().over(window).alias('rank')) 
  .filter(col('rank') >= 0.2) 
  .show()

val df2 = df.select('*', percent_rank().over(window).alias('rank')) 
  .filter(col('rank') < 0.2) 
  .show()
  

Комментарии:

1. Я имел в виду, что я хочу разделить фрейм данных, не изменяя порядок его строк, и 80% строк, которые появляются первыми, должны быть в df1, а оставшиеся 20% строк должны появиться в df2

2. тогда сработает второй вариант, поскольку ваш набор данных упорядочен по Ts. ответ 484 выше также будет работать для вас