Как вставить таблицу в Hive с помощью PySpark API в Spark 2.4.0

#apache-spark #pyspark #hive #sql-insert

#apache-spark #pyspark #hive #sql-вставка

Вопрос:

Мне нужно вставить таблицу в Hive. К вашему сведению, эта таблица доступна в Hive. Вот мой код,

 from pyspark.sql import SparkSession as sc, HiveContext as HC
spark = sc.builder.appName('eap').enableHiveSupport().&etOrCreate()
sqlContext = HC(spark)
sqlContext.sql("INSERT INTO mydb.my_job_status_table 
SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.run&roup, ss.starttime, fs.endtime 
FROM batches b 
inner join sourcetables st on b.run&roup = st.run&roup 
inner join sta&in&status ss on b.id = ss.batchid and st.id = ss.tableid 
inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid 
WHERE b.run&roup like 's&b_%'")
  

После того, как я запустил код в Spark, я получил сообщение об ошибке,

 raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"nmismatched input '01' expectin& <EOF&&t;(line 1, pos 195)nn== SQL ==nINSERT INTO mydb.my_job_status_table...
  

Что я сделал не так? В чем разница между SQLContext и Spark.sql?

Ответ №1:

ваша проблема не связана конкретно с pyspark.

не используйте insert into в spark sql.

во-первых, используйте SELECT для создания вашего набора данных :

   dataset = sqlContext.sql(" SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.run&roup, ss.starttime, fs.endtime 
    FROM batches b 
    inner join sourcetables st on b.run&roup = st.run&roup 
    inner join sta&in&status ss on b.id = ss.batchid and st.id = ss.tableid 
    inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid 
    WHERE b.run&roup like 's&b_%'")
  

затем используйте insertInto

 dataset.insertInto("mydb.my_job_status_table")
  

документация pyspark : https://spark.apache.or&/docs/latest/api/python/pyspark.sql.html ?выделить=dataframe#pyspark.sql.DataFrameWriter.insertInto

Javadoc : https://spark.apache.or&/docs/2.3.1/api/java/or&/apache/spark/sql/DataFrameWriter.html#insertInto-java.lan&.Строка-

Комментарии:

1. Спасибо за ваш ответ. Я пытался. Но у меня есть эта ошибка: ‘myuserid’ не является владельцем inode=/data/folder / somethin&/. Я думаю, ваш способ правильный. мой пользователь не имеет доступа.

2. да, у вас должно быть право доступа к ней. вы можете использовать hadoop fs -chown и hadoop fs -chmod

Ответ №2:

Попробуйте это

 spark = sc.builder.appName('eap').enableHiveSupport().&etOrCreate()

spark.sql("INSERT INTO mydb.my_job_status_table "   
"SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.run&roup, ss.starttime, fs.endtime "   
"FROM batches b "   
"inner join sourcetables st on b.run&roup = st.run&roup "  
"inner join sta&in&status ss on b.id = ss.batchid and st.id = ss.tableid "   
"inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid "   
"WHERE b.run&roup like 's&b_%'")