#python #amazon-web-services #amazon-s3 #aws-glue #amazon-athena
#python #amazon-веб-сервисы #amazon-s3 #aws-glue #amazon-athena
Вопрос:
Я создал фрейм данных, используя приведенный ниже код:
input1 : test.csv
id,score,type,date
41,0.4,2,2020-12-19
42,0.41,2,2020-12-19
t_tbl="test"
t_db="test"
source_df=spark.read.option("delimiter",",").option("header", "true").schema(schema).csv(source_path '//test.csv')
tdf = DynamicFrame.fromDF(s_df, glueContext, "s_df")
target_path="s3://test//test"
sink = glueContext.getSink(connection_type="s3", path=target_path,enableUpdateCatalog=True,
updateBehavior="UPDATE_IN_DATABASE",partitionKeys='')
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=t_db, catalogTableName=t_tbl)
sink.writeFrame(tdf)
Я могу записать динамический фрейм без каких-либо ошибок и могу запросить данные в Athena.
Затем я создаю новый фрейм данных, используя test1.csv, выполнив приведенный ниже код (такой же, как и приведенный выше код)
input 2: test1.csv
id,score,type,date
43,0.4,2,2020-12-19
44,0.41,2,2020-12-19
t_tbl="test"
t_db="test"
source_df=spark.read.option("delimiter",",").option("header", "true").schema(schema).csv(source_path '//test1.csv')
tdf = DynamicFrame.fromDF(s_df, glueContext, "s_df")
target_path="s3://test//test"
sink = glueContext.getSink(connection_type="s3", path=target_path,enableUpdateCatalog=True,
updateBehavior="UPDATE_IN_DATABASE",partitionKeys='')
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=t_db, catalogTableName=t_tbl)
sink.writeFrame(tdf)
теперь я получаю ошибку, как показано ниже:
Произошла ошибка:
An error occurred while calling o386.pyWriteDynamicFrame.
: com.amazonaws.services.glue.schema.builders.SchemaBuilderException: Record is incomplete.
at com.amazonaws.services.glue.schema.builders.SchemaBuilder.build(SchemaBuilder.java:301)
at com.amazonaws.services.glue.schema.Schema.withoutField(Schema.java:604)
at com.amazonaws.services.glue.schema.Schema.withoutFields(Schema.java:611)
at com.amazonaws.services.glue.sinks.HadoopDataSink$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:176)
at com.amazonaws.services.glue.sinks.HadoopDataSink$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:148)
at com.amazonaws.services.glue.util.FileSchemeWrapper$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
at com.amazonaws.services.glue.util.FileSchemeWrapper$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66)
at com.amazonaws.services.glue.sinks.HadoopDataSink.writeDynamicFrame(HadoopDataSink.scala:147)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
File "<stdin>", line 20, in write_to_sink
File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/PyGlue.zip/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name "_errors")
File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
Ответ №1:
Пожалуйста, удалите partitionKeys = «. Определите его, когда у вас есть ключ раздела для вывода. После удаления проблема будет решена.