#apache-flink
#apache-flink
Вопрос:
У меня есть следующий класс POJO,
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
@Table(keyspace = "testKey", name = "contact")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "name")
private String name;
@Column(name = "timeStamp")
private LocalDateTime timeStamp;
}
и код сопоставления является,
DataStream<Reading> sideOutput = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));
env.execute();
CassandraSink.addSink(sideOutput)
.setHost("localhost")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
Это также не работает без .getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));
.
sideOutput
Не передающее значение для сохранения в Cassandra. есть идеи, где я делаю неправильно?
Ответ №1:
Я бы сказал, env.execute();
должен вызываться после сборки конвейера, т. е. после CassandraSink
и избавился бы от побочного вывода. Что-то вроде этого должно сработать:
DataStream<Reading> ds = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
});
CassandraSink.addSink(ds)
.setHost("localhost")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
env.execute();
Комментарии:
1. Я пробовал с выполнением
env.execute();
. после сборки конвейера ошибкаNo support for the type of the given DataStream: GenericType<com.streaming.model.Person> at org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink.java:228)
2. Это отсутствие
TypeInformation
проблемы дляPerson
класса. Это определяет ряд правил для POJO: ci.apache.org/projects/flink/flink-docs-stable/dev /… . Не могли бы вы, пожалуйста, убедиться, что все правила выполнены?3. Во-первых, я бы добавил общедоступный конструктор без аргументов в
Person
и определил бы общедоступные методы получения и установки, которые соответствуют соглашениям об именовании Java beans для методов получения и установки полейname
иtimeStamp
.4. Я следовал правилам
TypeInformation
,Person
класс содержит средства получения, установки. Я сталкиваюсь с каким-то странным исключением утечки памяти, gist.github.com/thangavel-projects /…5. Итак, прямо сейчас задание запускается, но сообщается об исключении утечки памяти? Выполняется задание или нет?