#java #apache-beam #capture #incompatibletypeerror
Вопрос:
У меня возникли проблемы с использованием MultiOutputReciver для PCollectionTuple с тегом TupleTag в Apache Beam.
logSchema-это мой сгенерированный класс для обработки входящих журналов. дата, тип, сообщение и т.д. Что я хочу сделать, так это сохранить различные типы журналов (ошибки, предупреждения, уведомления) в разных коллекциях ПК.
Я получаю эту ошибку java: incompatible types: logSchema cannot be converted to capture#1 of ?
для каждого out.get(tags.get(0)).output(log);
внутри processElement
внутри branching extends DoFn<logSchema, logSchema>
В основном Required type: capture of ? Provided: logSchema
Я в основном следовал Руководству по программированию луча, охватывающему дополнительные выходы, но также и другим примерам, которые я мог бы найти здесь.
Кто-нибудь хочет объяснить, в чем я ошибаюсь? Я чувствую себя потерянным, но в то же время близким.
Редактировать: Похоже, я забыл .withOutputTags() на ветвящемся ПарДо ветвления. Добавил это в приведенный ниже код, по-прежнему получая ту же ошибку несовместимых типов. ИДЕЯ красит ее (делала это и раньше) и хочет привести к (PCollectionTuple), зачем это нужно?
Вот мой код
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import java.text.ParseException;
public class Pipe extends Thread {
public Pipe() {
}
static class conformToSchema extends DoFn<String, logSchema> {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<logSchema> receiver ) throws ParseException {
logSchema log = new logSchema(element);
receiver.output(log);
}
}
static class branching extends DoFn<logSchema, logSchema> {
private TupleTagList tags;
public branching(TupleTagList tags) {
this.tags = tags;
}
@ProcessElement
public void processElement(@Element logSchema log, MultiOutputReceiver out ) {
if (log.getType().equals("[notice]")) out.get(tags.get(0)).output(log);
else if (log.getType().equals("[error]")) out.get(tags.get(1)).output(log);
else if (log.getType().equals("[warn]")) out.get(tags.get(2)).output(log);
else if (log.getType().equals("[sout]") ) out.get(tags.get(3)).output(log);
}
}
public void run(){
TupleTag<logSchema> all = new TupleTag<>();
TupleTag<logSchema> noticesTag = new TupleTag<>();
TupleTag<logSchema> errorsTag = new TupleTag<>();
TupleTag<logSchema> warningsTag = new TupleTag<>();
TupleTag<logSchema> soutTag = new TupleTag<>();
TupleTagList tags = TupleTagList.of(noticesTag).and(errorsTag).and(warningsTag).and(soutTag);
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create();
PCollection<String> input = p.apply(TextIO.read().from("C:...."));
PCollection logObjects = input
.apply("Conform", ParDo.of(
new conformToSchema()));
PCollectionTuple multipleOutputs = (PCollectionTuple) logObjects.apply("Branch", ParDo.of(new branching(tags)).withOutputTags(all, tags));
PCollection<logSchema> notices = multipleOutputs.get(noticesTag);
PCollection<logSchema> errors = multipleOutputs.get(errorsTag);
PCollection<logSchema> warning = multipleOutputs.get(warningsTag);
PCollection<logSchema> sout = multipleOutputs.get(soutTag);
try {
p.run().waitUntilFinish();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Кстати, это первый пост здесь, ура.