#apache-beam
#apache-beam
Вопрос:
Я хочу сгенерировать неограниченную коллекцию строк и выполнить к ней SQL-запрос, используя диалект SQL Apache Beam Calcite и Apache Flink runner. Основываясь на исходном коде и документации Apache Beam, можно сделать что-то подобное с помощью поставщика таблиц: GenerateSequenceTableProvider . Но я не понимаю, как использовать его вне командной строки Beam SQL. Я хотел бы использовать его в своем обычном коде Java.
Я пытался сделать что-то вроде этого:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
tableProvider.createTable(Table.builder()
.name("sequence")
.schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
.type(tableProvider.getTableType())
.build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline).apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5").withTableProvider("sequenceSchema", tableProvider));
pipeline.run().waitUntilFinish();
Но я получаю Object 'sequence' not found within 'sequenceSchema'
ошибки, поэтому, я думаю, я на самом деле не создаю таблицу. Итак, как мне создать таблицу? Если я правильно понимаю, значения должны предоставляться автоматически поставщиком таблиц.
В принципе, как использовать поставщиков таблиц Beam SQL, если я хочу выполнять запросы к таблицам, которые эти поставщики должны (я думаю?) Генерировать?
Ответ №1:
С интерфейсом TableProvider немного сложно работать напрямую. Проблема, с которой вы сталкиваетесь, заключается в том, что GenerateSquenceTableProvider
, как и у многих других TableProviders, нет никакого способа хранить метаданные таблицы самостоятельно. Поэтому вызов его createTable
метода на самом деле не работает! Что вам нужно сделать, это обернуть его в InMemoryMetaStore, что-то вроде этого:
GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
InMemoryMetaStore metaStore = new InMemoryMetaStore();
metaStore.registerProvider(tableProvider);
metaStore.createTable(Table.builder()
.name("sequence")
.schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
.type(tableProvider.getTableType())
.build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline)
.apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5")
.withTableProvider("sequenceSchema", metaStore));
(Обратите внимание, что я не тестировал это, но я думаю, что что-то подобное должно работать)
Как указал robertwb, другим вариантом было бы просто избежать интерфейса TableProvider и использовать GenerateSequence напрямую. Вам просто нужно убедиться, что ваш PCollection имеет схему. Затем вы можете обработать его с помощью SqlTransform, например:
pc.apply(SqlTransform.query("select * from PCOLLECTION limit 5"))
Комментарии:
1. Спасибо, это почти работает! Просто небольшая опечатка: она должна быть
metaStore.registerProvider(tableProvider);
Ответ №2:
Если вы не можете заставить TableProviders работать, вы можете прочитать это как обычное PCollection
, а затем применить a SqlTransform
к результату.