#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql.SparkSession
object APP{
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
def main(args: Array[String]): Unit = {
val url = "jdbc:sqlserver://dc-bir-cdb01;databaseName=dbapp;integratedSecurity=true";
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val BusinessDate = "2019-02-28"
val destination = "src/main/resources/out/"
val filename = s"Example@$BusinessDate.csv.gz"
val outputFileName = destination "/temp_" filename
val mergedFileName = destination "/merged_" filename
val mergeFindGlob = outputFileName
val spark = SparkSession.
builder.master("local[*]")
//.config("spark.debug.maxToStringFields", "100")
.appName("Application Big Data")
.getOrCreate()
val query = s"""(SELECT a,b,c From table') tmp """.stripMargin
val responseWithSelectedColumns = spark
.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", query)
.load()
print("TOTAL: " responseWithSelectedColumns.count())
responseWithSelectedColumns
.coalesce(1) //So just a single part- file will be created
.repartition(10)
.write.mode("overwrite")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.format("com.databricks.spark.csv")
.option("charset", "UTF8")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //Avoid creating of crc files
.option("header", "true") //Write the header
.save(outputFileName)
merge(mergeFindGlob, mergedFileName)
responseWithSelectedColumns.unpersist()
spark.stop()
}
}
Приведенный выше код создает файл с несколькими заголовками.
Как я должен изменить код, чтобы в файле был только один заголовок?
Комментарии:
1. Как насчет простого удаления
.repartition(10)
?
Ответ №1:
По сути, вы пытаетесь сгенерировать CSV-файлы только с одним заголовком для всех них. Одним из простых решений было бы использовать coalesce(1)
и удалить repartition(10)
тот, который вы ввели. Проблема в том, что все данные отправляются в один раздел. Это может быть очень медленно или, что еще хуже, выдавать ошибку ООМ. Тем не менее (если это сработает), вы получите один большой файл с одним заголовком.
Чтобы продолжать использовать преимущества параллелизма a в spark, вы можете записать заголовок отдельно следующим образом (предполагая, что у нас есть фрейм данных df
)
val output = "hdfs:///...path.../output.csv"
val merged_output = "hdfs:///...path.../merged_output.csv"
import spark.implicits._
// Let's build the header
val header = responseWithSelectedColumns
.schema.fieldNames.reduceLeft(_ "," _)
// Let's write the data
responseWithSelectedColumns.write.csv(output)
// Let's write the header without spark
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val f = hdfs.create(new Path(output "/header"))
f.write(header.getBytes)
f.close()
// Let's merge everything into one file
FileUtil.copyMerge(hdfs, new Path(output), hdfs, new Path(merged_output),
false,hadoopConfig, null)
Также обратите внимание, что spark 2.x поддерживает запись csv из коробки. Это то, что я использовал вместо библиотеки databricks, которая делает вещи немного более подробными.
Комментарии:
1. Можете ли вы помочь мне предоставить полное кодовое решение из моего фрагмента кода.?
2. Я проверил свой код, и по какой-то причине заголовок не всегда появляется первым. Позвольте мне изменить свой ответ и добавить больше деталей.