#hadoop #hive #spark-streaming
#hadoop #hive #потоковая передача данных
Вопрос:
Я пытаюсь сохранить поток данных, поступающих из темы kafka, в таблицу разделов hive. Я смог преобразовать dstream в dataframe и создал контекст hive. Мой код выглядит следующим образом
val hiveContext = new HiveContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
newdf.registerTempTable("temp") //newdf is my dataframe
newdf.write.mode(SaveMode.Append).format("osv").partitionBy("date").saveAsTable("mytablename")
Но когда я развертываю приложение в кластере, оно говорит
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: file:/tmp/spark-3f00838b-c5d9-4a9a-9818-11fbb0007076/scratch_hive_2016-10-18_23-18-33_118_769650074381029645-1, expected: hdfs://
Когда я пытаюсь сохранить его как обычную таблицу и прокомментировать настройки hiveconfigurations, это работает. Но с таблицей разделов … это выдает мне эту ошибку.
Я также попытался зарегистрировать фрейм данных как временную таблицу, а затем записать эту таблицу в таблицу разделов. Выполнение этого также дало мне ту же ошибку
Может кто-нибудь, пожалуйста, рассказать, как я могу это решить. Спасибо.
Ответ №1:
Вам необходимо использовать hadoop (hdfs), настроенный, если вы развертываете приложение в кластере.
С помощью saveAsTable местоположение по умолчанию, в которое сохраняется Spark, контролируется хранилищем данных HiveMetaStore (на основе документов). Другим вариантом было бы использовать saveAsParquetFile и указать путь, а затем позже зарегистрировать этот путь в вашем метасторе hive ИЛИ использовать новый интерфейс DataFrameWriter и указать параметр path write.format(source).mode(mode).options(options).saveAsTable(tableName).
Ответ №2:
Я понял это. В коде для приложения spark я объявил местоположение scratch dir, как показано ниже, и это сработало.
sqlContext.sql("SET hive.exec.scratchdir=<hdfs location>")
Ответ №3:
SQLContext.sql(«УСТАНОВИТЬ hive.exec.scratchdir=location»)