Используйте TableProvider для создания таблицы и выполнения SQL-запроса в Apache Beam

#apache-beam

#apache-beam

Вопрос:

Я хочу сгенерировать неограниченную коллекцию строк и выполнить к ней SQL-запрос, используя диалект SQL Apache Beam Calcite и Apache Flink runner. Основываясь на исходном коде и документации Apache Beam, можно сделать что-то подобное с помощью поставщика таблиц: GenerateSequenceTableProvider . Но я не понимаю, как использовать его вне командной строки Beam SQL. Я хотел бы использовать его в своем обычном коде Java.

Я пытался сделать что-то вроде этого:

 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
tableProvider.createTable(Table.builder()
        .name("sequence")
        .schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
        .type(tableProvider.getTableType())
        .build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline).apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5").withTableProvider("sequenceSchema", tableProvider));

pipeline.run().waitUntilFinish();
 

Но я получаю Object 'sequence' not found within 'sequenceSchema' ошибки, поэтому, я думаю, я на самом деле не создаю таблицу. Итак, как мне создать таблицу? Если я правильно понимаю, значения должны предоставляться автоматически поставщиком таблиц.

В принципе, как использовать поставщиков таблиц Beam SQL, если я хочу выполнять запросы к таблицам, которые эти поставщики должны (я думаю?) Генерировать?

Ответ №1:

С интерфейсом TableProvider немного сложно работать напрямую. Проблема, с которой вы сталкиваетесь, заключается в том, что GenerateSquenceTableProvider , как и у многих других TableProviders, нет никакого способа хранить метаданные таблицы самостоятельно. Поэтому вызов его createTable метода на самом деле не работает! Что вам нужно сделать, это обернуть его в InMemoryMetaStore, что-то вроде этого:

 GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
InMemoryMetaStore metaStore = new InMemoryMetaStore();

metaStore.registerProvider(tableProvider);

metaStore.createTable(Table.builder()
        .name("sequence")
        .schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
        .type(tableProvider.getTableType())
        .build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline)
  .apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5")
           .withTableProvider("sequenceSchema", metaStore));
 

(Обратите внимание, что я не тестировал это, но я думаю, что что-то подобное должно работать)

Как указал robertwb, другим вариантом было бы просто избежать интерфейса TableProvider и использовать GenerateSequence напрямую. Вам просто нужно убедиться, что ваш PCollection имеет схему. Затем вы можете обработать его с помощью SqlTransform, например:

 pc.apply(SqlTransform.query("select * from PCOLLECTION limit 5"))
 

Комментарии:

1. Спасибо, это почти работает! Просто небольшая опечатка: она должна быть metaStore.registerProvider(tableProvider);

Ответ №2:

Если вы не можете заставить TableProviders работать, вы можете прочитать это как обычное PCollection , а затем применить a SqlTransform к результату.