При вызове o111.pyWriteDynamicFrame произошла ошибка. Исключение, выданное в ожидаемом результате:

#pyspark #apache-spark-sql #amazon-redshift #aws-glue

Вопрос:

Получаю эту ошибку в течение длительного времени нет, я действительно не знаю, что не так с моей работой, но я получаю эту ошибку теперь навсегда, пожалуйста, помогите мне решить ее. Ранее в нем говорилось, что у меня недостаточно длины для столбцов, но когда я увеличил длину, это также дало мне ту же ошибку. Также я посмотрел в таблицах stl_load_errors, но трудно найти, какая из них была недавно запущена. Пожалуйста, скажи мне, что происходит не так

Я создал таблицу в красном смещении

   create table my_schema.Tests (angaza_id varchar(256), 
  created_utc varchar(256),username varchar(65535), primary_phone varchar(256), role 
  varchar(65535),balance varchar(256), archived varchar(65535) ,country varchar(256), 
  first_name varchar(65535), last_name varchar(65535), parent_user varchar(65535), 
  organization varchar(65535), email varchar(65535) ,assigned_pricing_groups 
   varchar(65535))
 

РАБОТА С КЛЕЕМ

  import sys
  from awsglue.transforms import *
  from awsglue.utils import getResolvedOptions
 from pyspark.context import SparkContext
 from awsglue.dynamicframe import DynamicFrame
 from awsglue.context import GlueContext
 from awsglue.job import Job
 from pyspark.sql.types import *
 from pyspark.sql.functions import *

 from pyspark import SparkConf,SparkContext
 from pyspark.sql import *
 import pyspark.sql.functions as F

 args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])

 conf = SparkConf()
 sc = SparkContext()

 glueContext = GlueContext(sc)

 spark = glueContext.spark_session

 job = Job(glueContext)

 job.init(args['JOB_NAME'],args)


  l_table  = "s3://mygluecrawlerbucket/UsersData/l_users.csv"
  m_table = "s3://mygluecrawlerbucket/UsersData/m_users.csv"
  n_table  = "s3://mygluecrawlerbucket/UsersData/n_users.csv"
  a_table  = "s3://mygluecrawlerbucket/UsersData/a_users.csv"
  b_table  = "s3://mygluecrawlerbucket/UsersData/b_users.csv"
  c_table = "s3://mygluecrawlerbucket/UsersData/c_users.csv"


  angaza_ke= spark.read.format("csv")
    .option("header","true").option("inferSchema","true")
    .load(l_table)
 angaza_ug= spark.read.format("csv")
    .option("header","true").option("inferSchema","true")
    .load(m_table)
 angaza_zm= spark.read.format("csv")
    .option("header","true").option("inferSchema","true")
    .load(n_table)
 angaza_tz= spark.read.format("csv")
    .option("header","true").option("inferSchema","true")
    .load(a_table)
 angaza_ng= spark.read.format("csv")
    .option("header","true").option("inferSchema","true")
    .load(b_table)
angaza_mm= spark.read.format("csv")
    .option("header","true").option("inferSchema","true")
    .load(c_table)        
    
angaza_ug.createOrReplaceTempView("angaza_ug")
angaza_ke.createOrReplaceTempView("angaza_ke")
angaza_zm.createOrReplaceTempView("angaza_zm")
angaza_tz.createOrReplaceTempView("angaza_tz")
angaza_ng.createOrReplaceTempView("angaza_ng")
angaza_mm.createOrReplaceTempView("angaza_mm")




 sparkJoin = spark.sql("""Select * from (
 SELECT angaza_id, created_utc, username, primary_phone, role, 'balance_kes' as 
 balance, archived, 'Kenya' as country, first_name, last_name, parent_user, 
 organization, email, assigned_pricing_groups 
 FROM angaza_ke
    UNION ALL 
 SELECT angaza_id, created_utc, username, primary_phone, role, 'balance_mmk' as 
 balance, archived, 'Myanmar (Burma)' as country, first_name, last_name, NULL AS 
 parent_user, organization, email, assigned_pricing_groups 
 FROM angaza_mm
    UNION ALL
 SELECT angaza_id, created_utc, username, primary_phone, role,  'balance_ugx' as 
 balance, archived, 'Uganda' as country, first_name, last_name, parent_user, 
 organization, email, assigned_pricing_groups
 FROM angaza_ug  
   UNION ALL
SELECT angaza_id, created_utc, username, primary_phone, role,  'balance_ngn'  as 
balance, archived, 'Nigeria' as country, first_name, last_name, parent_user, 
organization, email, assigned_pricing_groups
FROM angaza_ng
  UNION ALL
Select angaza_id, created_utc, username, primary_phone, role,  'balance_tzs'  as 
balance,  archived, 'Tanzania' as country, first_name, last_name, parent_user, 
organization, email, assigned_pricing_groups
FROM angaza_tz
 UNION ALL
Select angaza_id, created_utc, username, primary_phone, role,  'balance_zmw'  as 
balance,  archived, 'Zambia' as country, first_name, last_name, 
"parent_user_optional" AS parent_user, organization, email, assigned_pricing_groups
FROM angaza_zm
 )""")




 dynamic_df = DynamicFrame.fromDF(sparkJoin,glueContext, "dynamic_df")

 mapped_df = ResolveChoice.apply(frame = dynamic_df,choice = 
 "make_cols",transformation_ctx = "mapped_df")

 datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mapped_df, 
catalog_connection = "redshift-new-connection", 
  connection_options = {"dbtable" : 
"my_schema.Tests","database":"dev"},redshift_tmp_dir = 
args["TempDir"], 
   transformation_ctx = "datasink"
   )