Фрейм данных становится пустым после добавления фрейма данных внутри цикла scala for

#scala #apache-spark-sql

#scala #apache-spark-sql

Вопрос:

На самом деле, я пытаюсь добавить фрейм данных к пустому фрейму данных в цикле for в scala. но добавленный фрейм данных становится пустым каждый раз. ниже приведен код

 import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import java.io._
import org.apache.spark.sql.DataFrame

object obj_Spark_url_Zipcode { 
  def main(args:Array[String]):Unit={

val spark = SparkSession.builder().appName("Spark_Url_Zip").master("local[*]").getOrCreate()
val sc    = spark.sparkContext
import spark.implicits._
sc.setLogLevel("Error")
System.setProperty("http.agent","chrome")
  
val schema_str     =  "first,gender,state,zip,phone"
val struct_dymic   =  StructType(schema_str.split(",").map(x=>StructField(x, StringType, true)))
val df_empty       =  spark.createDataFrame(sc.emptyRDD[Row],struct_dymic) 

for(i <- 1 to 10)
{
  val url_json_data  =  scala.io.Source.fromURL("https://webapiusr.mue/apii/0.05/?reslts=4554").mkString
  val url_json_rdd   =  sc.parallelize(url_json_data::Nil) //To convert a string to RDD
  val url_json_df    =  spark.read.option("multiline",true).json(url_json_rdd)
  val zipcode_df     =  url_json_df.withColumn("results",explode(col("results")))
                                   .select("results.user.name.first","results.user.gender","results.user.location.state","results.user.location.zip","results.user.phone")
 df_empty.union(zipcode_df)
 println("Curr val : " i)                                      
}
df_empty.show() 
  

}
}

 Result:
#######
Curr val : 1
Curr val : 2
Curr val : 3
Curr val : 4
Curr val : 5
Curr val : 6
Curr val : 7
Curr val : 8
Curr val : 9
Curr val : 10
 ----- ------ ----- --- ----- 
|first|gender|state|zip|phone|
 ----- ------ ----- --- ----- 
 ----- ------ ----- --- ----- 
  

я намерен добавить все фреймы данных, созданные внутри цикла for, в один фрейм данных и записать конечный фрейм данных в target.
Я не знаю, почему он становится пустым.

Я попробовал этот подход в pyspark. Добавление фреймов данных в массив и объединение массива фреймов данных в один фрейм данных. Но в scala я не могу добавлять фреймы данных в массив. (массив фреймов данных)

С уважением, Динеш Кумар

Комментарии:

1. df_empty является неизменяемым. Повторный вызов ничего df_empty.union(zipcode_df) не делает

Ответ №1:

Пример в Scala

 import spark.implicits._

case class ReduceUnion (id: Int, v: String)

val l = Array.range(1,10)

val d = l.map(i => Seq(ReduceUnion(i, s"Test $i")).toDF())

val resultDF = d.reduce(_ union _)

resultDF.printSchema()
resultDF.show(false)
//    root
//    |-- id: integer (nullable = false)
//    |-- v: string (nullable = true)
//  
//     --- ------ 
//    |id |v     |
//     --- ------ 
//    |1  |Test 1|
//    |2  |Test 2|
//    |3  |Test 3|
//    |4  |Test 4|
//    |5  |Test 5|
//    |6  |Test 6|
//    |7  |Test 7|
//    |8  |Test 8|
//    |9  |Test 9|
//     --- ------