#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
Здравствуйте, я новичок в spark и scala, и я хотел бы разделить следующий фрейм данных:
df:
---------- ----- ------ ---------- --------
| Ts| Temp| Wind| Precipit|Humidity|
---------- ----- ------ ---------- --------
|1579647600| 10| 22| 10| 50|
|1579734000| 11| 21| 10| 55|
|1579820400| 10| 18| 15| 60|
|1579906800| 9| 23| 20| 60|
|1579993200| 8| 24| 25| 50|
|1580079600| 10| 18| 27| 60|
|1580166000| 11| 20| 30| 50|
|1580252400| 12| 17| 15| 50|
|1580338800| 10| 14| 21| 50|
|1580425200| 9| 16| 25| 60|
----------- ----- ------ ---------- --------
Результирующие фреймы данных должны быть следующими:
df1:
---------- ----- ------ ---------- --------
| Ts| Temp| Wind| Precipit|Humidity|
---------- ----- ------ ---------- --------
|1579647600| 10| 22| 10| 50|
|1579734000| 11| 21| 10| 55|
|1579820400| 10| 18| 15| 60|
|1579906800| 9| 23| 20| 60|
|1579993200| 8| 24| 25| 50|
|1580079600| 10| 18| 27| 60|
|1580166000| 11| 20| 30| 50|
|1580252400| 12| 17| 15| 50|
---------- ----- ------ ---------- --------
df2:
---------- ----- ------ ---------- --------
| Ts| Temp| Wind| Precipit|Humidity|
---------- ----- ------ ---------- --------
|1580338800| 10| 14| 21| 50|
|1580425200| 9| 16| 25| 60|
----------- ----- ------ ---------- --------
где df1 имеет 80% верхних строк df, а df2 — оставшиеся 20%.
Ответ №1:
Попробуйте использовать monotonically_increasing_id()
функцию с window percent_rank()
, так как эта функция сохраняет порядок.
Example:
val df=sc.parallelize(Seq((1579647600,10,22,10,50),
(1579734000,11,21,10,55),
(1579820400,10,18,15,60),
(1579906800, 9,23,20,60),
(1579993200, 8,24,25,50),
(1580079600,10,18,27,60),
(1580166000,11,20,30,50),
(1580252400,12,17,15,50),
(1580338800,10,14,21,50),
(1580425200, 9,16,25,60)),10).toDF("Ts","Temp","Wind","Precipit","Humidity")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val df1=df.withColumn("mid",monotonically_increasing_id)
val df_above_80=df1.withColumn("pr",percent_rank().over(w)).filter(col("pr") >= 0.8).drop(Seq("mid","pr"):_*)
val df_below_80=df1.withColumn("pr",percent_rank().over(w)).filter(col("pr") < 0.8).drop(Seq("mid","pr"):_*)
df_below_80.show()
/*
---------- ---- ---- -------- --------
| Ts|Temp|Wind|Precipit|Humidity|
---------- ---- ---- -------- --------
|1579647600| 10| 22| 10| 50|
|1579734000| 11| 21| 10| 55|
|1579820400| 10| 18| 15| 60|
|1579906800| 9| 23| 20| 60|
|1579993200| 8| 24| 25| 50|
|1580079600| 10| 18| 27| 60|
|1580166000| 11| 20| 30| 50|
|1580252400| 12| 17| 15| 50|
---------- ---- ---- -------- --------
*/
df_above_80.show()
/*
---------- ---- ---- -------- --------
| Ts|Temp|Wind|Precipit|Humidity|
---------- ---- ---- -------- --------
|1580338800| 10| 14| 21| 50|
|1580425200| 9| 16| 25| 60|
---------- ---- ---- -------- --------
*/
Ответ №2:
Предполагая, что данные разделяются случайным образом:
val Array(df1, df2) = df.randomSplit(Array(0.8, 0.2))
Однако, если под «верхними строками» вы подразумеваете столбец ‘Ts’ в вашем примере фрейма данных, то вы могли бы сделать это:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,percent_rank}
val window = Window.partitionBy().orderBy(df['Ts'].desc())
val df1 = df.select('*', percent_rank().over(window).alias('rank'))
.filter(col('rank') >= 0.2)
.show()
val df2 = df.select('*', percent_rank().over(window).alias('rank'))
.filter(col('rank') < 0.2)
.show()
Комментарии:
1. Я имел в виду, что я хочу разделить фрейм данных, не изменяя порядок его строк, и 80% строк, которые появляются первыми, должны быть в df1, а оставшиеся 20% строк должны появиться в df2
2. тогда сработает второй вариант, поскольку ваш набор данных упорядочен по Ts. ответ 484 выше также будет работать для вас