Невозможно запустить задание с исходящими тегами

#google-cloud-dataflow

#google-cloud-поток данных

Вопрос:

Я боролся с заданием, которое должно выдавать побочные выходные данные, поскольку я продолжаю получать исключение («невозможно сериализовать xxx»).

Несмотря на то, что я явно указал кодировщика для типа, с которым я работаю, я продолжал получать ту же ошибку, поэтому я решил написать простое задание, следуя этой документации:

https://cloud.google.com/dataflow/model/par-do#tags-for-side-outputs

К моему удивлению, я все еще получаю такое же исключение, и теперь я подозреваю, что, должно быть, я сделал что-то не так (но я сам не могу понять это). Что касается кода, я попытался следовать примеру, приведенному выше.

Ниже я публикую исходный код, а также сообщение об ошибке, которое я получаю при его запуске. Я считаю, что это воспроизводимо (измените ‘GCS_BUCKET’ на любую корзину, которая у вас есть, и создайте метод main (), который вызывает ‘TestSideOutput’ с аргументами), и было бы полезно узнать, может ли кто-то еще воспроизвести на своем конце. Мы используем JDK 8 и Dataflow SDK 1.7.0.

Пожалуйста, обратите внимание, что в примере в документации выше используется анонимный класс, расширяющий DoFn, который я также пробовал, но получил такое же сообщение об ошибке; приведенный ниже код преобразует этот класс в именованный внутренний класс (‘Filter’) вместо этого.

Я также попытался инициализировать TupleTags без фигурных скобок («{}») — потому что это фактически выдает предупреждение — что приводит к исключению (см. Последний фрагмент кода в этом сообщении).

Вот код, который я использовал:

 package tmp.dataflow.experimental;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.moloco.dataflow.DataflowConstants;

public class TestSideOutput {
  private TestOptions options;
  private static final String GCS_BUCKET = "gs://dataflow-experimental/"; // Change to your bucket name

  public TestSideOutput(String[] args) {
    options = PipelineOptionsFactory.fromArgs(args).as(TestOptions.class);
    options.setProject(DataflowConstants.PROJCET_NAME);
    options.setStagingLocation(DataflowConstants.STAGING_BUCKET);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setJobName(options.getJob()   "-test-sideoutput");
  }

  public void execute() {
    Pipeline pipeline = Pipeline.create(options);
    // 1. Read sample data.
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading")
        .from(GCS_BUCKET   "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of()));

    // 2. Create tags for outputs.
    final TupleTag<String> mainTag = new TupleTag<String>() {};
    final TupleTag<String> sideTag = new TupleTag<String>() {};

    // 3. Apply ParDo with side output tags.
    Filter filter = new Filter("DATAFLOW", sideTag);
    PCollectionTuple results =
        profiles.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter));

    // 4. Retrieve outputs.
    PCollection<String> mainOutput = results.get(mainTag);
    PCollection<String> sideOutput = results.get(sideTag);

    // 5. Write to GCS.
    mainOutput.apply(
        TextIO.Write.named("writingMain").to(GCS_BUCKET   "example/main-output/main").withCoder(StringUtf8Coder.of()));
    sideOutput.apply(
        TextIO.Write.named("writingSide").to(GCS_BUCKET   "example/side-output/side").withCoder(StringUtf8Coder.of()));

    // 6. Run pipeline.
    pipeline.run();
  }

  static class Filter extends DoFn<String, String> {
    private static final long serialVersionUID = 0;
    final TupleTag<String> sideTag;
    String keyword;

    public Filter(String keyword, TupleTag<String> sideTag) {
      this.sideTag = sideTag;
      this.keyword = keyword;
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
      String profile = c.element();
      if (profile.contains(keyword)) {
        c.output(profile);
      } else {
        c.sideOutput(sideTag, profile);
      }
    }
  }
}
  

И это команда, которую я использовал, и ошибка / исключение, которое я получил (она содержит только несколько аргументов командной строки, которые мы используем для нашего пакета потока данных, здесь ничего особенного, просто чтобы дать вам представление):

 dataflow-20161003.R3$ ./bin/dataflow --job=test-experimental-sideoutput --numWorkers=1 --date=0001-01-01
Oct 04, 2016 12:37:34 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 121 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize tmp.dataflow.experimental.TestSideOutput$Filter@6986852
        at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
        at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91)
        at com.google.cloud.dataflow.sdk.transforms.ParDo$BoundMulti.<init>(ParDo.java:959)
        at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:912)
        at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:908)
        at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:41)
        at com.moloco.dataflow.Main.main(Main.java:152)
Caused by: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
        ... 6 more
  

In addition, I don’t think this is relevant, but the code for the ‘TestOptions’ class:

 package tmp.dataflow.experimental;

import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.Validation;

public interface TestOptions extends DataflowPipelineOptions {
  @Description("Job")
  @Validation.Required
  String getJob();

  void setJob(String value);

  @Description("Job suffix")
  String getJobSuffix();

  void setJobSuffix(String value);

  @Description("Date")
  @Validation.Required
  String getDate();

  void setDate(String value);
}
  

Наконец, если бы я удалил фигурные скобки «{}» при создании экземпляров TupleTags, я бы вместо этого получил следующее исключение (и я нашел в Stackoverflow предложения о том, что я должен создавать их с помощью «{}», чтобы избежать подобных проблем):

 Oct 04, 2016 12:43:56 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 122 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for FilterByKeyword.out1 [PCollection]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. If this error occurs for a side output of the producing ParDo, verify that the TupleTag for this output is constructed with proper type information (see TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.
  Using the default output Coder from the producing PTransform failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure.
        at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195)
        at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
        at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
        at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
        at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331)
        at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
        at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
        at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:50)
        at com.moloco.dataflow.Main.main(Main.java:152)
  

Редактировать: смотрите Ответ ниже для решения этой проблемы, сделав execute () «статическим».

Приведенный ниже код напоминает то, что я первоначально опубликовал, с двумя изменениями: всякий раз, когда это возможно, я снова указываю явно (и избыточно) «coder» для каждой коллекции PC. Кроме того, при создании экземпляра TupleTags фигурные скобки отсутствуют.

Обратите внимание, какой подход (статический или этот избыточный подход) является более подходящим.

   public void execute() {
    Pipeline pipeline = Pipeline.create(options);
    // 1. Read sample data.
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading")
        .from(GCS_BUCKET   "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of()));

    // 2. Create tags for outputs.
    final TupleTag<String> mainTag = new TupleTag<String>();
    final TupleTag<String> sideTag = new TupleTag<String>();

    // 3. Apply ParDo with side output tags.
    Filter filter = new Filter("DATAFLOW", sideTag);
    PCollectionTuple results = profiles.setCoder(StringUtf8Coder.of())
        .apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter));

    // 4. Retrieve outputs.
    PCollection<String> mainOutput = results.get(mainTag);
    PCollection<String> sideOutput = results.get(sideTag);

    // 5. Write to GCS.
    mainOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingMain")
        .to(GCS_BUCKET   "example/main-output-from-nonstatic/main").withCoder(StringUtf8Coder.of()));
    sideOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingSide")
        .to(GCS_BUCKET   "example/side-output-from-nonstatic/side").withCoder(StringUtf8Coder.of()));

    // 6. Run pipeline.
    pipeline.run();
  }
  

Ответ №1:

Ошибка, которую вы получаете, заключается в том, что ваш Filter fn ссылается на TupleTag , который, в свою очередь (поскольку это нестатический анонимный класс, созданный из нестатической функции execute() ) ссылается на вложение TestSideOutput .

Итак, конвейер пытается сериализовать TestSideOutput объект, и он не сериализуем, о чем свидетельствует сообщение: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput .

Основная причина заключается в том, что метод execute() не является статическим. Сделать его статичным должно устранить проблему.

Комментарии:

1. Действительно, то, что предложил йо, устранило проблему, с которой я столкнулся. Спасибо! С другой стороны, у нас есть другое задание, в котором есть нестатический метод execute(), из которого мы применяем ParDo с побочными тегами вывода, и он не генерирует исключение (и отчасти поэтому я написал пример кода выше, поскольку он показался мне странным). На данный момент я не могу опубликовать эту стоимость, но мне интересно, есть ли другой способ решить эту проблему, не делая метод execute() статическим?

2. Я вроде как ответил на свой последующий вопрос (см. Добавленный фрагмент кода в конце моего отредактированного вопроса). Представляется возможным сохранить execute() как нестатический, по возможности явно и избыточно объявляя кодировщиков.