сохранение фрейма данных в таблицу разделов hive в spark

#hadoop #hive #spark-streaming

#hadoop #hive #потоковая передача данных

Вопрос:

Я пытаюсь сохранить поток данных, поступающих из темы kafka, в таблицу разделов hive. Я смог преобразовать dstream в dataframe и создал контекст hive. Мой код выглядит следующим образом

 val hiveContext = new HiveContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
newdf.registerTempTable("temp") //newdf is my dataframe
newdf.write.mode(SaveMode.Append).format("osv").partitionBy("date").saveAsTable("mytablename")
  

Но когда я развертываю приложение в кластере, оно говорит

 Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: file:/tmp/spark-3f00838b-c5d9-4a9a-9818-11fbb0007076/scratch_hive_2016-10-18_23-18-33_118_769650074381029645-1, expected: hdfs://
  

Когда я пытаюсь сохранить его как обычную таблицу и прокомментировать настройки hiveconfigurations, это работает. Но с таблицей разделов … это выдает мне эту ошибку.

Я также попытался зарегистрировать фрейм данных как временную таблицу, а затем записать эту таблицу в таблицу разделов. Выполнение этого также дало мне ту же ошибку

Может кто-нибудь, пожалуйста, рассказать, как я могу это решить. Спасибо.

Ответ №1:

Вам необходимо использовать hadoop (hdfs), настроенный, если вы развертываете приложение в кластере.

С помощью saveAsTable местоположение по умолчанию, в которое сохраняется Spark, контролируется хранилищем данных HiveMetaStore (на основе документов). Другим вариантом было бы использовать saveAsParquetFile и указать путь, а затем позже зарегистрировать этот путь в вашем метасторе hive ИЛИ использовать новый интерфейс DataFrameWriter и указать параметр path write.format(source).mode(mode).options(options).saveAsTable(tableName).

Ответ №2:

Я понял это. В коде для приложения spark я объявил местоположение scratch dir, как показано ниже, и это сработало.

 sqlContext.sql("SET hive.exec.scratchdir=<hdfs location>")
  

Ответ №3:

SQLContext.sql(«УСТАНОВИТЬ hive.exec.scratchdir=location»)