#apache-flink
Вопрос:
У меня есть ниже схема avro User.avsc
{
"type": "record",
"namespace": "com.myorg",
"name": "User",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
Приведенный ниже User.java
класс java генерируется из указанного выше User.avsc с помощью плагина avro-maven.
package com.myorg;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;
@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
private static final long serialVersionUID = 8699049231783654635L;
public static final Schema SCHEMA$ = (new Parser()).parse("{"type":"record","name":"User","namespace":"com.myorg","fields":[{"name":"id","type":"long"},{"name":"name","type":{"type":"string","avro.java.string":"String"}}]}");
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<User> ENCODER;
private static final BinaryMessageDecoder<User> DECODER;
/** @deprecated */
@Deprecated
public long id;
/** @deprecated */
@Deprecated
public String name;
private static final DatumWriter<User> WRITER$;
private static final DatumReader<User> READER$;
public static Schema getClassSchema() {
return SCHEMA$;
}
public static BinaryMessageDecoder<User> getDecoder() {
return DECODER;
}
public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
}
public ByteBuffer toByteBuffer() throws IOException {
return ENCODER.encode(this);
}
public static User fromByteBuffer(ByteBuffer b) throws IOException {
return (User)DECODER.decode(b);
}
public User() {
}
public User(Long id, String name) {
this.id = id;
this.name = name;
}
public Schema getSchema() {
return SCHEMA$;
}
public Object get(int field$) {
switch(field$) {
case 0:
return this.id;
case 1:
return this.name;
default:
throw new AvroRuntimeException("Bad index");
}
}
public void put(int field$, Object value$) {
switch(field$) {
case 0:
this.id = (Long)value$;
break;
case 1:
this.name = (String)value$;
break;
default:
throw new AvroRuntimeException("Bad index");
}
}
public Long getId() {
return this.id;
}
public void setId(Long value) {
this.id = value;
}
public String getName() {
return this.name;
}
public void setName(String value) {
this.name = value;
}
public void writeExternal(ObjectOutput out) throws IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
public void readExternal(ObjectInput in) throws IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
static {
ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
READER$ = MODEL$.createDatumReader(SCHEMA$);
}
}
Я хочу записать экземпляр User SpecificRecord в файл, используя файловую ссылку apache flink.
Ниже приведена программа, которую я написал —
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;
public class AvroFileSinkApp {
private static final String OUTPUT_PATH = "./il/";
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(5000L);
DataStream<User> source = env.fromCollection(Arrays.asList(getUser()));
FileSink<User> sink = org.apache.flink.connector.file.sink.FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).build();
source.addSink( sink);
env.execute("FileSinkProgram");
}
public static User getUser() {
User u = new User();
u.setId(1L);
return u;
}
}
Я написал эту программу, используя это и это в качестве ссылки. По какой — то причине строка source.addSink( sink);
выдает ошибку компиляции ниже.
несовместимые типы: org.apache.флинк.разъем.файл.раковина.Ссылка на файл<com.myorg.User> не может быть преобразована в org.apache.flink.потоковая передача.api.функции.раковина.Функция SinkFunction<com.myorg.User>
Проект находится на github здесь
Ответ №1:
Возможно, вы можете взглянуть на интерфейс потока данных. Входной параметр функции addSink имеет тип SinkFunction, а входным параметром функции sinkTo является Sink.
Ссылка файлов реализована на основе интерфейса приемника, вы должны использовать функцию sinkTo
public class DataStream<T> {
......
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
/**
* Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
* executed once the {@link StreamExecutionEnvironment#execute()} method is called.
*
* @param sink The user defined sink.
* @return The closed DataStream.
*/
@Experimental
public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
return new DataStreamSink<>(this, sink);
}
......
}