Элементы, излучающие проблемы, для нескольких выходных коллекций ПК в Apache Beam

#java #apache-beam #capture #incompatibletypeerror

Вопрос:

У меня возникли проблемы с использованием MultiOutputReciver для PCollectionTuple с тегом TupleTag в Apache Beam.

logSchema-это мой сгенерированный класс для обработки входящих журналов. дата, тип, сообщение и т.д. Что я хочу сделать, так это сохранить различные типы журналов (ошибки, предупреждения, уведомления) в разных коллекциях ПК.

Я получаю эту ошибку java: incompatible types: logSchema cannot be converted to capture#1 of ?

для каждого out.get(tags.get(0)).output(log); внутри processElement внутри branching extends DoFn<logSchema, logSchema>

В основном Required type: capture of ? Provided: logSchema

Я в основном следовал Руководству по программированию луча, охватывающему дополнительные выходы, но также и другим примерам, которые я мог бы найти здесь.

Кто-нибудь хочет объяснить, в чем я ошибаюсь? Я чувствую себя потерянным, но в то же время близким.

Редактировать: Похоже, я забыл .withOutputTags() на ветвящемся ПарДо ветвления. Добавил это в приведенный ниже код, по-прежнему получая ту же ошибку несовместимых типов. ИДЕЯ красит ее (делала это и раньше) и хочет привести к (PCollectionTuple), зачем это нужно?

Вот мой код

 import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import java.text.ParseException;

public class Pipe extends Thread {

    public Pipe() {
    }

    static class conformToSchema extends DoFn<String, logSchema> {
        @ProcessElement
        public void processElement(@Element String element, OutputReceiver<logSchema> receiver ) throws ParseException {
            logSchema log = new logSchema(element);
            receiver.output(log);
        }
    }

    static class branching extends DoFn<logSchema, logSchema> {
        private TupleTagList tags;
        public branching(TupleTagList tags) {
            this.tags = tags;
        }
        @ProcessElement
        public void processElement(@Element logSchema log, MultiOutputReceiver out ) {
            if (log.getType().equals("[notice]")) out.get(tags.get(0)).output(log);
            else if (log.getType().equals("[error]")) out.get(tags.get(1)).output(log);
            else if (log.getType().equals("[warn]")) out.get(tags.get(2)).output(log);
            else if (log.getType().equals("[sout]") ) out.get(tags.get(3)).output(log);
        }
    }

    public void run(){
        TupleTag<logSchema> all = new TupleTag<>();
        TupleTag<logSchema> noticesTag = new TupleTag<>();
        TupleTag<logSchema> errorsTag = new TupleTag<>();
        TupleTag<logSchema> warningsTag = new TupleTag<>();
        TupleTag<logSchema> soutTag = new TupleTag<>();
        TupleTagList tags = TupleTagList.of(noticesTag).and(errorsTag).and(warningsTag).and(soutTag);

        PipelineOptions options = PipelineOptionsFactory.create();

        Pipeline p = Pipeline.create();
        PCollection<String> input = p.apply(TextIO.read().from("C:...."));
        PCollection logObjects = input
                .apply("Conform", ParDo.of(
                        new conformToSchema()));

        PCollectionTuple multipleOutputs = (PCollectionTuple) logObjects.apply("Branch", ParDo.of(new branching(tags)).withOutputTags(all, tags));

        PCollection<logSchema> notices = multipleOutputs.get(noticesTag);
        PCollection<logSchema> errors = multipleOutputs.get(errorsTag);
        PCollection<logSchema> warning = multipleOutputs.get(warningsTag);
        PCollection<logSchema> sout = multipleOutputs.get(soutTag);

        try {
            p.run().waitUntilFinish();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 

Кстати, это первый пост здесь, ура.