#pyspark #apache-spark-sql #amazon-redshift #aws-glue
Вопрос:
Получаю эту ошибку в течение длительного времени нет, я действительно не знаю, что не так с моей работой, но я получаю эту ошибку теперь навсегда, пожалуйста, помогите мне решить ее. Ранее в нем говорилось, что у меня недостаточно длины для столбцов, но когда я увеличил длину, это также дало мне ту же ошибку. Также я посмотрел в таблицах stl_load_errors, но трудно найти, какая из них была недавно запущена. Пожалуйста, скажи мне, что происходит не так
Я создал таблицу в красном смещении
create table my_schema.Tests (angaza_id varchar(256),
created_utc varchar(256),username varchar(65535), primary_phone varchar(256), role
varchar(65535),balance varchar(256), archived varchar(65535) ,country varchar(256),
first_name varchar(65535), last_name varchar(65535), parent_user varchar(65535),
organization varchar(65535), email varchar(65535) ,assigned_pricing_groups
varchar(65535))
РАБОТА С КЛЕЕМ
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkConf,SparkContext
from pyspark.sql import *
import pyspark.sql.functions as F
args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])
conf = SparkConf()
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
l_table = "s3://mygluecrawlerbucket/UsersData/l_users.csv"
m_table = "s3://mygluecrawlerbucket/UsersData/m_users.csv"
n_table = "s3://mygluecrawlerbucket/UsersData/n_users.csv"
a_table = "s3://mygluecrawlerbucket/UsersData/a_users.csv"
b_table = "s3://mygluecrawlerbucket/UsersData/b_users.csv"
c_table = "s3://mygluecrawlerbucket/UsersData/c_users.csv"
angaza_ke= spark.read.format("csv")
.option("header","true").option("inferSchema","true")
.load(l_table)
angaza_ug= spark.read.format("csv")
.option("header","true").option("inferSchema","true")
.load(m_table)
angaza_zm= spark.read.format("csv")
.option("header","true").option("inferSchema","true")
.load(n_table)
angaza_tz= spark.read.format("csv")
.option("header","true").option("inferSchema","true")
.load(a_table)
angaza_ng= spark.read.format("csv")
.option("header","true").option("inferSchema","true")
.load(b_table)
angaza_mm= spark.read.format("csv")
.option("header","true").option("inferSchema","true")
.load(c_table)
angaza_ug.createOrReplaceTempView("angaza_ug")
angaza_ke.createOrReplaceTempView("angaza_ke")
angaza_zm.createOrReplaceTempView("angaza_zm")
angaza_tz.createOrReplaceTempView("angaza_tz")
angaza_ng.createOrReplaceTempView("angaza_ng")
angaza_mm.createOrReplaceTempView("angaza_mm")
sparkJoin = spark.sql("""Select * from (
SELECT angaza_id, created_utc, username, primary_phone, role, 'balance_kes' as
balance, archived, 'Kenya' as country, first_name, last_name, parent_user,
organization, email, assigned_pricing_groups
FROM angaza_ke
UNION ALL
SELECT angaza_id, created_utc, username, primary_phone, role, 'balance_mmk' as
balance, archived, 'Myanmar (Burma)' as country, first_name, last_name, NULL AS
parent_user, organization, email, assigned_pricing_groups
FROM angaza_mm
UNION ALL
SELECT angaza_id, created_utc, username, primary_phone, role, 'balance_ugx' as
balance, archived, 'Uganda' as country, first_name, last_name, parent_user,
organization, email, assigned_pricing_groups
FROM angaza_ug
UNION ALL
SELECT angaza_id, created_utc, username, primary_phone, role, 'balance_ngn' as
balance, archived, 'Nigeria' as country, first_name, last_name, parent_user,
organization, email, assigned_pricing_groups
FROM angaza_ng
UNION ALL
Select angaza_id, created_utc, username, primary_phone, role, 'balance_tzs' as
balance, archived, 'Tanzania' as country, first_name, last_name, parent_user,
organization, email, assigned_pricing_groups
FROM angaza_tz
UNION ALL
Select angaza_id, created_utc, username, primary_phone, role, 'balance_zmw' as
balance, archived, 'Zambia' as country, first_name, last_name,
"parent_user_optional" AS parent_user, organization, email, assigned_pricing_groups
FROM angaza_zm
)""")
dynamic_df = DynamicFrame.fromDF(sparkJoin,glueContext, "dynamic_df")
mapped_df = ResolveChoice.apply(frame = dynamic_df,choice =
"make_cols",transformation_ctx = "mapped_df")
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mapped_df,
catalog_connection = "redshift-new-connection",
connection_options = {"dbtable" :
"my_schema.Tests","database":"dev"},redshift_tmp_dir =
args["TempDir"],
transformation_ctx = "datasink"
)