Flink SerializationSchema: не удалось сериализовать ошибку строки

#java #apache-flink

#java #apache-flink

Вопрос:

У меня возникли некоторые проблемы с использованием flink SerializationSchema.

Вот мой основной код :

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*Extract TypeInformation<Row> from an avsc schema file*/ ).build();
DataStream<Row> myDataStream = env.addSource( new MyCustomSource(sourceDeserializer) ) ;
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");

env.execute("MyJob");
  

Вот моя пользовательская функция приемника :

 import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
    private final boolean print;
    private final SerializationSchema<Row> serializationSchema;

    public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
        this.serializationSchema = serializationSchema;
    }

    @Override
    public void invoke(final Row value, final Context context) throws Exception {

        try {
            LOGGER.info("MyCustomSink- invoke : [{}]", new String(serializationSchema.serialize(value)));
        }catch (Exception e){
            LOGGER.error("MyCustomSink- Error while sending data : "   e);
        }
    }
}
  

И вот моя пользовательская исходная функция (не уверен, что она полезна для моей проблемы) :

 import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {

    /** logger */
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);

    /** the JSON deserializer */
    private final DeserializationSchema<T> deserializationSchema;

    public MyCustomSource(final DeserializationSchema<T> deserializer) {
        this.deserializationSchema = deserializer;
    }

    @Override
    public void open(final Configuration parameters) {
        ...
    }

    @Override
    public void run(final SourceContext<T> ctx) throws Exception {
        LOGGER.info("run");
        InputStream data = ...; // Retrieve the input json data
        final T row = deserializationSchema
                        .deserialize(ByteStreams.toByteArray(data));
        ctx.collect(row);
        
    }

    @Override
    public void cancel() {
        ...
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return deserializationSchema.getProducedType();
    }
}
  

Теперь я запускаю свой код и последовательно отправляю некоторые данные в свой конвейер :

 ==> 
{
    "id": "sensor1",
    "data":{
        "rotation": 250
    }
}
  

Здесь данные правильно напечатаны моим приемником : MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}]

 ==> 
{
    "id": "sensor1"
}
  

Здесь данные правильно напечатаны моим приемником : MyCustomSink- invoke : [{"id":"sensor1","data":null}]

 ==> 
{
    "id": "sensor1",
    "data":{
        "rotation": 250
    }
}
  

Здесь возникает ошибка при сериализации. Распечатанный журнал ошибок является :

 MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.
  

Я вообще не понимаю, почему у меня такое поведение. У кого-нибудь есть идея?

Примечания:

  • Использование Flink 1.9.2

— РЕДАКТИРОВАТЬ —

Я добавил часть пользовательского источника

— ПРАВКА 2 —

После дополнительных исследований, похоже, что такое поведение вызвано private transient ObjectNode node из JsonRowSerializationSchema . Если я правильно понимаю, это используется для оптимизации, но, похоже, является причиной моей проблемы.

Является ли это нормальным поведением, и если да, то каким было бы правильное использование этого класса в моем случае? (Иначе, есть ли какой-либо способ обойти эту проблему?)

Комментарии:

1. Возможно ли для вас попробовать самую последнюю версию flink, 1.11? Я думаю, вероятно, была связанная проблема, исправленная в этом PR: github.com/apache/flink/pull/11180/files /…

2. Да, точно, я только что узнал то же самое. Это связано с ошибкой, которая была исправлена в Flink 1.10 Спасибо!

3. Могу ли я опубликовать это в качестве ответа, пожалуйста?

4. Конечно, дерзайте

Ответ №1:

Это JsonRowSerializationSchema ошибка, которая была исправлена в самых последних версиях Flink — я полагаю, этот PR решает проблему выше.