#java #apache-beam
Вопрос:
Я пытаюсь прочитать некоторые данные с помощью jdbIO.read в apache beam, и это работает нормально, если у меня есть следующий код.
Pipeline p = createPipeline(options);
p.apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery("query ")
.withCoder(SerializableCoder.of(TestRow.class))
.withRowMapper(new JdbcIO.RowMapper<TestRow>() {
@Override
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
//setters
return testRow;
}
}))
.apply(MapElements.via(new SimpleFunction<TestRow, String>() {
@Override
public String apply(TestRow input) {
return input.toString();
}
}));
Но не получаю никаких результатов, когда я рефакторирую это таким образом, чтобы удалить анонимные функции и поместить этот вызов в отдельный класс и расширить класс DoFn. блок сопоставления строк вообще не выполняется.
PCollection<String> t = p
.apply(Create.<Long>of(1L))
.apply("Read Data", ParDo.of(readInput))
public abstract class ReadInput<S, T> extends DoFn<Long, TestRow> {
@DoFn.ProcessElement
public void processElement(@Element Long seq, final OutputReceiver<TestRow> receiver) {
getInput(receiver);
public class ReadInputOtc extends ReadInput<Long, TestRow>
@Override
protected void getInput(OutputReceiver<TestRow> receiver) {
JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.dataSource))
.withCoder(SerializableCoder.of(TestRow.class))
.withQuery("query ")
.withRowMapper(new JdbcIO.RowMapper<TestRow>() {
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
//setters
while (resultSet.next()) {
System.out.println(resultSet.getString("id"));
}
receiver.output(testRow);
return testRow;
}
});
}
спасибо за вашу помощь
Комментарии:
1. Это ваш реальный компилируемый код? Должен
return TestRow;
ли на самом деле возвращать экземпляр, а не класс?2. ошибка копирования / вставки, я обновил это, спасибо
Ответ №1:
JdbcIO.<TestRow>read()
просто создает PTransform для чтения, фактически он не выполняет никакого чтения. Чтобы выполнить чтение, оно должно быть применено к объекту конвейера (как у вас в вашем первом примере), который создает PCollection записей. PTransforms не предназначены для использования в DoFn, DoFn действуют на отдельные элементы, а не на коллекции элементов.
Если вы пытаетесь удалить анонимные классы, вы можете написать свой код следующим образом
[public static] class MuRowMapper extends JdbcIO.RowMapper<TestRow> {
@Override
public TestRow mapRow(ResultSet resultSet) throws Exception {
TestRow testRow = new TestRow();
...
return testRow;
}
}
[public static] class MyDoFn extends DoFn<MyRow, String> {
@DoFn.ProcessElement
public void processElement(@Element TestRow testRow,
final OutputReceiver<String> receiver) {
return receiver.output(testRow.toString());
}
}
Pipeline p = createPipeline(options);
p
.apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery("query ")
.withCoder(SerializableCoder.of(TestRow.class))
.withRowMapper(new MyRowMapper()))
.apply(ParDo.of(new MyDoFn()));