Динамический фрейм в glue, дающий исключение SchemaBuilderException: запись неполная?

#python #amazon-web-services #amazon-s3 #aws-glue #amazon-athena

#python #amazon-веб-сервисы #amazon-s3 #aws-glue #amazon-athena

Вопрос:

Я создал фрейм данных, используя приведенный ниже код:

 input1 : test.csv

     id,score,type,date
    41,0.4,2,2020-12-19
    42,0.41,2,2020-12-19

t_tbl="test"
t_db="test"
source_df=spark.read.option("delimiter",",").option("header",  "true").schema(schema).csv(source_path '//test.csv')
tdf = DynamicFrame.fromDF(s_df, glueContext, "s_df")
target_path="s3://test//test"
sink = glueContext.getSink(connection_type="s3", path=target_path,enableUpdateCatalog=True,
              updateBehavior="UPDATE_IN_DATABASE",partitionKeys='')
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=t_db, catalogTableName=t_tbl)
sink.writeFrame(tdf)
 

Я могу записать динамический фрейм без каких-либо ошибок и могу запросить данные в Athena.

Затем я создаю новый фрейм данных, используя test1.csv, выполнив приведенный ниже код (такой же, как и приведенный выше код)

 input 2: test1.csv

       id,score,type,date
        43,0.4,2,2020-12-19
        44,0.41,2,2020-12-19


t_tbl="test"
t_db="test"
source_df=spark.read.option("delimiter",",").option("header", "true").schema(schema).csv(source_path '//test1.csv')
 
tdf = DynamicFrame.fromDF(s_df, glueContext, "s_df")
target_path="s3://test//test"
sink = glueContext.getSink(connection_type="s3", path=target_path,enableUpdateCatalog=True,
              updateBehavior="UPDATE_IN_DATABASE",partitionKeys='')
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=t_db, catalogTableName=t_tbl)
sink.writeFrame(tdf)
 

теперь я получаю ошибку, как показано ниже:

Произошла ошибка:

 An error occurred while calling o386.pyWriteDynamicFrame.
: com.amazonaws.services.glue.schema.builders.SchemaBuilderException: Record is incomplete.
    at com.amazonaws.services.glue.schema.builders.SchemaBuilder.build(SchemaBuilder.java:301)
    at com.amazonaws.services.glue.schema.Schema.withoutField(Schema.java:604)
    at com.amazonaws.services.glue.schema.Schema.withoutFields(Schema.java:611)
    at com.amazonaws.services.glue.sinks.HadoopDataSink$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:176)
    at com.amazonaws.services.glue.sinks.HadoopDataSink$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:148)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.sinks.HadoopDataSink.writeDynamicFrame(HadoopDataSink.scala:147)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):
  File "<stdin>", line 20, in write_to_sink
  File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/PyGlue.zip/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name   "_errors")
  File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/mnt/yarn/usercache/livy/appcache/application_1609223793940_0004/container_1609223793940_0004_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
 

Ответ №1:

Пожалуйста, удалите partitionKeys = «. Определите его, когда у вас есть ключ раздела для вывода. После удаления проблема будет решена.