Приемник файлов Avro SpecificRecord с использованием apache flink не компилируется из-за ошибки несовместимые типы: Файловая ссылка не может быть преобразована в функцию SinkFunction

#apache-flink

Вопрос:

У меня есть ниже схема avro User.avsc

 {
  "type": "record",
  "namespace": "com.myorg",
  "name": "User",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}
 

Приведенный ниже User.java класс java генерируется из указанного выше User.avsc с помощью плагина avro-maven.

 package com.myorg;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;

@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
    private static final long serialVersionUID = 8699049231783654635L;
    public static final Schema SCHEMA$ = (new Parser()).parse("{"type":"record","name":"User","namespace":"com.myorg","fields":[{"name":"id","type":"long"},{"name":"name","type":{"type":"string","avro.java.string":"String"}}]}");
    private static SpecificData MODEL$ = new SpecificData();
    private static final BinaryMessageEncoder<User> ENCODER;
    private static final BinaryMessageDecoder<User> DECODER;
    /** @deprecated */
    @Deprecated
    public long id;
    /** @deprecated */
    @Deprecated
    public String name;
    private static final DatumWriter<User> WRITER$;
    private static final DatumReader<User> READER$;

    public static Schema getClassSchema() {
        return SCHEMA$;
    }

    public static BinaryMessageDecoder<User> getDecoder() {
        return DECODER;
    }

    public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
        return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
    }

    public ByteBuffer toByteBuffer() throws IOException {
        return ENCODER.encode(this);
    }

    public static User fromByteBuffer(ByteBuffer b) throws IOException {
        return (User)DECODER.decode(b);
    }

    public User() {
    }

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Schema getSchema() {
        return SCHEMA$;
    }

    public Object get(int field$) {
        switch(field$) {
        case 0:
            return this.id;
        case 1:
            return this.name;
        default:
            throw new AvroRuntimeException("Bad index");
        }
    }

    public void put(int field$, Object value$) {
        switch(field$) {
        case 0:
            this.id = (Long)value$;
            break;
        case 1:
            this.name = (String)value$;
            break;
        default:
            throw new AvroRuntimeException("Bad index");
        }

    }

    public Long getId() {
        return this.id;
    }

    public void setId(Long value) {
        this.id = value;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String value) {
        this.name = value;
    }

    public void writeExternal(ObjectOutput out) throws IOException {
        WRITER$.write(this, SpecificData.getEncoder(out));
    }

    public void readExternal(ObjectInput in) throws IOException {
        READER$.read(this, SpecificData.getDecoder(in));
    }

    static {
        ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
        DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
        WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
        READER$ = MODEL$.createDatumReader(SCHEMA$);
    }

}
 

Я хочу записать экземпляр User SpecificRecord в файл, используя файловую ссылку apache flink.

Ниже приведена программа, которую я написал —

 import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;

public class AvroFileSinkApp {

    private static final String OUTPUT_PATH = "./il/";
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(4);
        env.enableCheckpointing(5000L);

        DataStream<User> source = env.fromCollection(Arrays.asList(getUser()));
        FileSink<User> sink = org.apache.flink.connector.file.sink.FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).build();

        source.addSink( sink);
        env.execute("FileSinkProgram");
    }

    public static User getUser() {
        User u = new User();
        u.setId(1L);
        return u;
    }
}
 

Я написал эту программу, используя это и это в качестве ссылки. По какой — то причине строка source.addSink( sink); выдает ошибку компиляции ниже.

несовместимые типы: org.apache.флинк.разъем.файл.раковина.Ссылка на файл<com.myorg.User> не может быть преобразована в org.apache.flink.потоковая передача.api.функции.раковина.Функция SinkFunction<com.myorg.User>

Проект находится на github здесь

Ответ №1:

Возможно, вы можете взглянуть на интерфейс потока данных. Входной параметр функции addSink имеет тип SinkFunction, а входным параметром функции sinkTo является Sink.

Ссылка файлов реализована на основе интерфейса приемника, вы должны использовать функцию sinkTo

 public class DataStream<T> {
......
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }

        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

    /**
     * Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
     * executed once the {@link StreamExecutionEnvironment#execute()} method is called.
     *
     * @param sink The user defined sink.
     * @return The closed DataStream.
     */
    @Experimental
    public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        return new DataStreamSink<>(this, sink);
    }
......
}