#scala #apache-spark-sql
#scala #apache-spark-sql
Вопрос:
На самом деле, я пытаюсь добавить фрейм данных к пустому фрейму данных в цикле for в scala. но добавленный фрейм данных становится пустым каждый раз. ниже приведен код
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import java.io._
import org.apache.spark.sql.DataFrame
object obj_Spark_url_Zipcode {
def main(args:Array[String]):Unit={
val spark = SparkSession.builder().appName("Spark_Url_Zip").master("local[*]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
sc.setLogLevel("Error")
System.setProperty("http.agent","chrome")
val schema_str = "first,gender,state,zip,phone"
val struct_dymic = StructType(schema_str.split(",").map(x=>StructField(x, StringType, true)))
val df_empty = spark.createDataFrame(sc.emptyRDD[Row],struct_dymic)
for(i <- 1 to 10)
{
val url_json_data = scala.io.Source.fromURL("https://webapiusr.mue/apii/0.05/?reslts=4554").mkString
val url_json_rdd = sc.parallelize(url_json_data::Nil) //To convert a string to RDD
val url_json_df = spark.read.option("multiline",true).json(url_json_rdd)
val zipcode_df = url_json_df.withColumn("results",explode(col("results")))
.select("results.user.name.first","results.user.gender","results.user.location.state","results.user.location.zip","results.user.phone")
df_empty.union(zipcode_df)
println("Curr val : " i)
}
df_empty.show()
}
}
Result:
#######
Curr val : 1
Curr val : 2
Curr val : 3
Curr val : 4
Curr val : 5
Curr val : 6
Curr val : 7
Curr val : 8
Curr val : 9
Curr val : 10
----- ------ ----- --- -----
|first|gender|state|zip|phone|
----- ------ ----- --- -----
----- ------ ----- --- -----
я намерен добавить все фреймы данных, созданные внутри цикла for, в один фрейм данных и записать конечный фрейм данных в target.
Я не знаю, почему он становится пустым.
Я попробовал этот подход в pyspark. Добавление фреймов данных в массив и объединение массива фреймов данных в один фрейм данных. Но в scala я не могу добавлять фреймы данных в массив. (массив фреймов данных)
С уважением, Динеш Кумар
Комментарии:
1.
df_empty
является неизменяемым. Повторный вызов ничегоdf_empty.union(zipcode_df)
не делает
Ответ №1:
Пример в Scala
import spark.implicits._
case class ReduceUnion (id: Int, v: String)
val l = Array.range(1,10)
val d = l.map(i => Seq(ReduceUnion(i, s"Test $i")).toDF())
val resultDF = d.reduce(_ union _)
resultDF.printSchema()
resultDF.show(false)
// root
// |-- id: integer (nullable = false)
// |-- v: string (nullable = true)
//
// --- ------
// |id |v |
// --- ------
// |1 |Test 1|
// |2 |Test 2|
// |3 |Test 3|
// |4 |Test 4|
// |5 |Test 5|
// |6 |Test 6|
// |7 |Test 7|
// |8 |Test 8|
// |9 |Test 9|
// --- ------