Отклонение, не передающее значение для сохранения в Cassandra

#apache-flink

#apache-flink

Вопрос:

У меня есть следующий класс POJO,

 import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "testKey", name = "contact")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "name")
    private String name;

    @Column(name = "timeStamp")
    private LocalDateTime timeStamp;
}
  

и код сопоставления является,

 DataStream<Reading> sideOutput = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));

 env.execute();
 
 CassandraSink.addSink(sideOutput)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();
  

Это также не работает без .getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class))); .

sideOutput Не передающее значение для сохранения в Cassandra. есть идеи, где я делаю неправильно?

Ответ №1:

Я бы сказал, env.execute(); должен вызываться после сборки конвейера, т. е. после CassandraSink и избавился бы от побочного вывода. Что-то вроде этого должно сработать:

 DataStream<Reading> ds = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        });
 
 CassandraSink.addSink(ds)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();


 env.execute();
  

Комментарии:

1. Я пробовал с выполнением env.execute(); . после сборки конвейера ошибка No support for the type of the given DataStream: GenericType<com.streaming.model.Person> at org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink.java:228)

2. Это отсутствие TypeInformation проблемы для Person класса. Это определяет ряд правил для POJO: ci.apache.org/projects/flink/flink-docs-stable/dev /… . Не могли бы вы, пожалуйста, убедиться, что все правила выполнены?

3. Во-первых, я бы добавил общедоступный конструктор без аргументов в Person и определил бы общедоступные методы получения и установки, которые соответствуют соглашениям об именовании Java beans для методов получения и установки полей name и timeStamp .

4. Я следовал правилам TypeInformation , Person класс содержит средства получения, установки. Я сталкиваюсь с каким-то странным исключением утечки памяти, gist.github.com/thangavel-projects /…

5. Итак, прямо сейчас задание запускается, но сообщается об исключении утечки памяти? Выполняется задание или нет?