Как сохранить строку как тип JSONB в postgres при использовании AWS Glue

#postgresql #amazon-web-services #pyspark #aws-glue #jsonb

#postgresql #amazon-веб-сервисы #pyspark #aws-glue #jsonb

Вопрос:

Я ищу решение о том, как записать строку как тип jsonb в postgresql. Итак, DynamicFrame имеет столбец string, который содержит данные json. При попытке сохранить в postgres

 DataSink0 = glueContext.write_dynamic_frame.from_catalog(frame = Transform0, database = "cms", table_name = "cms_public_listings", transformation_ctx = "DataSink0")
  

Я получаю следующую ошибку:

Произошла ошибка:

 An error occurred while calling o1623.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 134.0 failed 4 times, most recent failure: Lost task 0.3 in stage 134.0 (TID 137, ip-172-31-27-18.ec2.internal, executor 24): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "public".listings ([REMOVED_COLUMNS]) VALUES ([REMOVED_VALUES]) was aborted: ERROR: column "schema" is of type jsonb but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 207  Call getNextException to see other errors in the batch.
  

Я не могу изменить схему для хранения строки, поэтому либо я использую AWS Glue ETL, либо мне придется создавать задание оболочки Python. Я бы предпочел найти способ использовать PySpark с AWS Glue.

Ответ №1:

Я предпочитаю использовать собственный spark dataframe, потому что это позволяет мне больше настраивать.Я могу использовать свойство stringtype для преобразования поля json из фрейма данных в поле jsonb в таблице. В этом случае мой фрейм данных содержит два поля.

 from pyspark import SparkConf

sc = SparkContext.getOrCreate(SparkConf())
spark = SparkSession(sc)

df = spark.read.format('csv') 
               .option('delimiter','|') 
               .option('header','True') 
               .load('your_path') 

##some transformation...

url = 'jdbc:postgresql://your_host:5432/your_databasename'
properties = {'user':'*****',
              'password':'*****',
              'driver': "org.postgresql.Driver",
              'stringtype':"unspecified"}
        
df.write.jdbc(url=url, table='your_tablename', mode='append', properties=properties)
  

Перед выполнением вышеупомянутого скрипта вам следует создать таблицу в postgresql, поскольку режим свойств установлен как append . Это выглядит следующим образом:

 create table your_tablename
(
    my_json_field jsonb,
    another_field int
)