#java #apache-flink
#java #apache-flink
Вопрос:
У меня возникли некоторые проблемы с использованием flink SerializationSchema.
Вот мой основной код :
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*Extract TypeInformation<Row> from an avsc schema file*/ ).build();
DataStream<Row> myDataStream = env.addSource( new MyCustomSource(sourceDeserializer) ) ;
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");
env.execute("MyJob");
Вот моя пользовательская функция приемника :
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
private final boolean print;
private final SerializationSchema<Row> serializationSchema;
public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
this.serializationSchema = serializationSchema;
}
@Override
public void invoke(final Row value, final Context context) throws Exception {
try {
LOGGER.info("MyCustomSink- invoke : [{}]", new String(serializationSchema.serialize(value)));
}catch (Exception e){
LOGGER.error("MyCustomSink- Error while sending data : " e);
}
}
}
И вот моя пользовательская исходная функция (не уверен, что она полезна для моей проблемы) :
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);
/** the JSON deserializer */
private final DeserializationSchema<T> deserializationSchema;
public MyCustomSource(final DeserializationSchema<T> deserializer) {
this.deserializationSchema = deserializer;
}
@Override
public void open(final Configuration parameters) {
...
}
@Override
public void run(final SourceContext<T> ctx) throws Exception {
LOGGER.info("run");
InputStream data = ...; // Retrieve the input json data
final T row = deserializationSchema
.deserialize(ByteStreams.toByteArray(data));
ctx.collect(row);
}
@Override
public void cancel() {
...
}
@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
}
Теперь я запускаю свой код и последовательно отправляю некоторые данные в свой конвейер :
==>
{
"id": "sensor1",
"data":{
"rotation": 250
}
}
Здесь данные правильно напечатаны моим приемником : MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}]
==>
{
"id": "sensor1"
}
Здесь данные правильно напечатаны моим приемником : MyCustomSink- invoke : [{"id":"sensor1","data":null}]
==>
{
"id": "sensor1",
"data":{
"rotation": 250
}
}
Здесь возникает ошибка при сериализации. Распечатанный журнал ошибок является :
MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.
Я вообще не понимаю, почему у меня такое поведение. У кого-нибудь есть идея?
Примечания:
- Использование Flink 1.9.2
— РЕДАКТИРОВАТЬ —
Я добавил часть пользовательского источника
— ПРАВКА 2 —
После дополнительных исследований, похоже, что такое поведение вызвано private transient ObjectNode node
из JsonRowSerializationSchema
. Если я правильно понимаю, это используется для оптимизации, но, похоже, является причиной моей проблемы.
Является ли это нормальным поведением, и если да, то каким было бы правильное использование этого класса в моем случае? (Иначе, есть ли какой-либо способ обойти эту проблему?)
Комментарии:
1. Возможно ли для вас попробовать самую последнюю версию flink, 1.11? Я думаю, вероятно, была связанная проблема, исправленная в этом PR: github.com/apache/flink/pull/11180/files /…
2. Да, точно, я только что узнал то же самое. Это связано с ошибкой, которая была исправлена в Flink 1.10 Спасибо!
3. Могу ли я опубликовать это в качестве ответа, пожалуйста?
4. Конечно, дерзайте
Ответ №1:
Это JsonRowSerializationSchema
ошибка, которая была исправлена в самых последних версиях Flink — я полагаю, этот PR решает проблему выше.