#java #sql #apache-beam
#java #sql #apache-beam
Вопрос:
Я пытаюсь прочитать файл записи avro с помощью apache beam sql. Схема avro — sample.avsc, а соответствующий файл записи avro — sample.avro. Код выглядит следующим образом:
@DefaultCoder(AvroCoder.class)
public class ReadAvro {
public static void main(String[] args) throws IOException, CannotProvideCoderException {
PipelineOptions option = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(option);
Schema schema = new Schema.Parser().parse(new File("/Users/m0k04ac/word-count-beam/src/main/java/org/apache/beam/examples/sample.avsc"));
PCollection<GenericRecord> pc = p.apply(AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("/Users/m0k04ac/word-count-beam/src/main/java/org/apache/beam/examples/sample.avro"));
pc.apply("SQL transforms", SqlTransform.query("SELECT username FROM PCOLLECTION"))
.apply("Outputdata",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
System.out.println("PCOLLECTION: " input.getValues());
return input;
}
}
)
);
p.run().waitUntilFinish();
}
}
Я получаю следующую ошибку:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Outputdata/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for a Beam Row. Please provide a schema instead using PCollection.setRowSchema.
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:114)
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:263)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:249)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:582)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:316)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at org.apache.beam.examples.ReadAvro.main(ReadAvro.java:39)
Может кто-нибудь сказать мне, что не так с кодом и как решить? И да, запись avro соответствует заданной схеме sample.avsc.