Заголовок слияния, чтобы в файле был только один заголовок

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

     import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
    import org.apache.spark.sql.SparkSession

    object APP{

      def merge(srcPath: String, dstPath: String): Unit = {
        val hadoopConfig = new Configuration()
        val hdfs = FileSystem.get(hadoopConfig)
        FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
        // the "true" setting deletes the source files once they are merged into the new output
      }

      def main(args: Array[String]): Unit = {

        val url = "jdbc:sqlserver://dc-bir-cdb01;databaseName=dbapp;integratedSecurity=true";
        val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        val BusinessDate = "2019-02-28"
        val destination = "src/main/resources/out/"
        val filename = s"Example@$BusinessDate.csv.gz"
        val outputFileName = destination   "/temp_"   filename
        val mergedFileName = destination   "/merged_"   filename
        val mergeFindGlob = outputFileName


        val spark = SparkSession.
          builder.master("local[*]")
          //.config("spark.debug.maxToStringFields", "100")
          .appName("Application Big Data")
          .getOrCreate()
        val query = s"""(SELECT a,b,c From table') tmp """.stripMargin

        val responseWithSelectedColumns = spark
          .read
          .format("jdbc")
          .option("url", url)
          .option("driver", driver)
          .option("dbtable", query)
          .load()

        print("TOTAL: " responseWithSelectedColumns.count())

        responseWithSelectedColumns
          .coalesce(1) //So just a single part- file will be created
          .repartition(10)
          .write.mode("overwrite")
          .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
          .format("com.databricks.spark.csv")
          .option("charset", "UTF8")
          .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //Avoid creating of crc files
          .option("header", "true") //Write the header

          .save(outputFileName)
        merge(mergeFindGlob, mergedFileName)
        responseWithSelectedColumns.unpersist()

        spark.stop()
      }
    }
  

Приведенный выше код создает файл с несколькими заголовками.

Как я должен изменить код, чтобы в файле был только один заголовок?

Комментарии:

1. Как насчет простого удаления .repartition(10) ?

Ответ №1:

По сути, вы пытаетесь сгенерировать CSV-файлы только с одним заголовком для всех них. Одним из простых решений было бы использовать coalesce(1) и удалить repartition(10) тот, который вы ввели. Проблема в том, что все данные отправляются в один раздел. Это может быть очень медленно или, что еще хуже, выдавать ошибку ООМ. Тем не менее (если это сработает), вы получите один большой файл с одним заголовком.

Чтобы продолжать использовать преимущества параллелизма a в spark, вы можете записать заголовок отдельно следующим образом (предполагая, что у нас есть фрейм данных df )

     val output = "hdfs:///...path.../output.csv"
    val merged_output = "hdfs:///...path.../merged_output.csv"

    import spark.implicits._
    // Let's build the header
    val header = responseWithSelectedColumns
        .schema.fieldNames.reduceLeft(_ "," _)

    // Let's write the data
    responseWithSelectedColumns.write.csv(output)

    // Let's write the header without spark
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    val f = hdfs.create(new Path(output   "/header"))
    f.write(header.getBytes)
    f.close()

    // Let's merge everything into one file
    FileUtil.copyMerge(hdfs, new Path(output), hdfs, new Path(merged_output),
                                    false,hadoopConfig, null)

  

Также обратите внимание, что spark 2.x поддерживает запись csv из коробки. Это то, что я использовал вместо библиотеки databricks, которая делает вещи немного более подробными.

Комментарии:

1. Можете ли вы помочь мне предоставить полное кодовое решение из моего фрагмента кода.?

2. Я проверил свой код, и по какой-то причине заголовок не всегда появляется первым. Позвольте мне изменить свой ответ и добавить больше деталей.