Java Beam sql java.lang.Исключение IllegalStateException при чтении файла записи avro с заданной схемой avro

#java #sql #apache-beam

#java #sql #apache-beam

Вопрос:

Я пытаюсь прочитать файл записи avro с помощью apache beam sql. Схема avro — sample.avsc, а соответствующий файл записи avro — sample.avro. Код выглядит следующим образом:

 @DefaultCoder(AvroCoder.class)
public class ReadAvro {
  public static void main(String[] args) throws IOException, CannotProvideCoderException {
    PipelineOptions option = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline p = Pipeline.create(option);

    Schema schema = new Schema.Parser().parse(new File("/Users/m0k04ac/word-count-beam/src/main/java/org/apache/beam/examples/sample.avsc"));
    PCollection<GenericRecord> pc = p.apply(AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("/Users/m0k04ac/word-count-beam/src/main/java/org/apache/beam/examples/sample.avro"));
    pc.apply("SQL transforms", SqlTransform.query("SELECT username FROM PCOLLECTION"))
        .apply("Outputdata",
            MapElements.via(
                new SimpleFunction<Row, Row>() {
                  @Override
                  public Row apply(Row input) {
                    System.out.println("PCOLLECTION: "   input.getValues());
                    return input;
                  }
                }
            )
        );
    p.run().waitUntilFinish();
  }
}
  

Я получаю следующую ошибку:

 Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Outputdata/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for a Beam Row. Please provide a schema instead using PCollection.setRowSchema.
  Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
    at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
    at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:114)
    at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:263)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:249)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:582)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:316)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
    at org.apache.beam.examples.ReadAvro.main(ReadAvro.java:39)
  

Может кто-нибудь сказать мне, что не так с кодом и как решить? И да, запись avro соответствует заданной схеме sample.avsc.