JdbcIO.read не возвращает результаты в apache beam

#java #apache-beam

Вопрос:

Я пытаюсь прочитать некоторые данные с помощью jdbIO.read в apache beam, и это работает нормально, если у меня есть следующий код.

 Pipeline p = createPipeline(options);
p.apply(JdbcIO.<TestRow>read()
                   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
                        .withQuery("query  ")
                        .withCoder(SerializableCoder.of(TestRow.class))
                        .withRowMapper(new JdbcIO.RowMapper<TestRow>() {
                            @Override
                            public TestRow mapRow(ResultSet resultSet) throws Exception {
                                TestRow testRow = new TestRow();
                                //setters

                                return testRow;
                            }
                        }))
                .apply(MapElements.via(new SimpleFunction<TestRow, String>() {
                    @Override
                    public String apply(TestRow input) {
                        
                        return input.toString();
                    }
                }));
 

Но не получаю никаких результатов, когда я рефакторирую это таким образом, чтобы удалить анонимные функции и поместить этот вызов в отдельный класс и расширить класс DoFn. блок сопоставления строк вообще не выполняется.

 PCollection<String> t = p
                 .apply(Create.<Long>of(1L))
                 .apply("Read Data", ParDo.of(readInput))

public abstract class ReadInput<S, T> extends DoFn<Long, TestRow> {

     @DoFn.ProcessElement
    public void processElement(@Element Long seq, final OutputReceiver<TestRow> receiver) {
        getInput(receiver);

public class ReadInputOtc extends ReadInput<Long, TestRow>
@Override
    protected void getInput(OutputReceiver<TestRow> receiver) {
        JdbcIO.<TestRow>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.dataSource))
                .withCoder(SerializableCoder.of(TestRow.class))
                .withQuery("query ")
                .withRowMapper(new JdbcIO.RowMapper<TestRow>() {
                    public TestRow mapRow(ResultSet resultSet) throws Exception {
                        TestRow testRow = new TestRow();
                       //setters
                        while (resultSet.next()) {
                            System.out.println(resultSet.getString("id"));
                        }
                        receiver.output(testRow);
                        return testRow;

                    }

                });
    }
 

спасибо за вашу помощь

Комментарии:

1. Это ваш реальный компилируемый код? Должен return TestRow; ли на самом деле возвращать экземпляр, а не класс?

2. ошибка копирования / вставки, я обновил это, спасибо

Ответ №1:

JdbcIO.<TestRow>read() просто создает PTransform для чтения, фактически он не выполняет никакого чтения. Чтобы выполнить чтение, оно должно быть применено к объекту конвейера (как у вас в вашем первом примере), который создает PCollection записей. PTransforms не предназначены для использования в DoFn, DoFn действуют на отдельные элементы, а не на коллекции элементов.

Если вы пытаетесь удалить анонимные классы, вы можете написать свой код следующим образом

 [public static] class MuRowMapper extends JdbcIO.RowMapper<TestRow> {
    @Override
    public TestRow mapRow(ResultSet resultSet) throws Exception {
        TestRow testRow = new TestRow();
        ...
        return testRow;
    }
}

[public static] class MyDoFn extends DoFn<MyRow, String> {
    @DoFn.ProcessElement
    public void processElement(@Element TestRow testRow, 
                               final OutputReceiver<String> receiver) {
        return receiver.output(testRow.toString());
    }
}

Pipeline p = createPipeline(options);
p
    .apply(JdbcIO.<TestRow>read()
                 .withDataSourceConfiguration(
                     JdbcIO.DataSourceConfiguration.create(dataSource))
                 .withQuery("query  ")
                 .withCoder(SerializableCoder.of(TestRow.class))
                 .withRowMapper(new MyRowMapper()))
    .apply(ParDo.of(new MyDoFn()));