Запись в облачное хранилище из потока данных с помощью динамического времени данных

# #java #google-cloud-storage #google-cloud-dataflow #apache-beam

Вопрос:

Мы создаем конвейер Apache Beam (Java SDK), который записывает данные в облачное хранилище. Мы используем TextIO.write() преобразование для записи в хранилище. В рамках этой операции мы хотели бы динамически изменять подкаталог, в котором хранится файл, в зависимости от текущей даты и времени.

Это часть потоковой передачи. В идеале мы хотели бы развернуть его и позволить заданию Beam динамически изменять подкаталог папки, в которой он сохраняет файл, в зависимости от даты и времени обработки.

Наше текущее преобразование трубопровода выглядит следующим образом:

 DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");

myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));
 

Проблема с этим кодом заключается в том, что значение даты и времени, возвращаемое функцией, застревает на том же значении, что и при развертывании конвейера в потоке данных Google Cloud. Мы хотели бы динамически изменять структуру подкаталогов в зависимости от даты и времени во время обработки входящего сообщения.

Я собирался:

  1. Получите дату — время из функции ParDo.
  2. Создайте новую функцию ParDo, используйте сообщение в качестве основного ввода и передайте дату-время из другой функции ParDo в качестве бокового ввода.

Является ли это лучшим подходом? Существуют ли встроенные инструменты в Apache Beam, которые могут решить наш вариант использования?

Комментарии:

1. Это может помочь вам: beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/…

2. Спасибо за ваш ответ. Я все еще не понимаю, как я могу использовать класс ValueProvider.RuntimeValueProvider для создания пути к подкаталогу во время выполнения. Не могли бы вы, пожалуйста, сказать мне, как я могу включить dftOut.print(...) метод в класс ValueProvider?

Ответ №1:

FileIO предоставляет метод writeDynamic (), который позволяет направлять каждый элемент коллекции PC в другой каталог или файл с учетом содержимого самого элемента.

Я помещаю ниже простой пример, который я создал просто для демонстрации:

 public class ExamplePipeline {
public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline pipeline = Pipeline.create(options);


    Create.Values<String> sampleData = Create.of("ABC","DEF", "GHI");

    pipeline.apply(sampleData)
            .apply("WritingDynamic", FileIO.<PartitionData, String>writeDynamic()
                    .by(event -> new PartitionData())
                    .withDestinationCoder(AvroCoder.of(PartitionData.class))
                    .via(Contextful.fn(event -> event), TextIO.sink())
                    .to("gs://my-bucket/")
                    .withNaming(partitionData -> FileIO.Write.defaultNaming(partitionData.getPath(), ".txt")));

    pipeline.run().waitUntilFinish();
}

public static class PartitionData implements Serializable {
    private static final long serialVersionUID = 4549174774076944665L;

    public String getPath() {
        LocalDateTime writtingMoment = LocalDateTime.now(ZoneOffset.UTC);
        int year = writtingMoment.getYear();
        int month = writtingMoment.getMonthValue();
        int day = writtingMoment.getDayOfMonth();

        return String.format("%d/d/d/", year, month, day);
    }
}
 

Приведенный выше код будет сохранен в структуру:
gs://my-bucket/${year}/%{month}/%{day}/... .txt

В by() методе я использовал свой разделитель данных, который я вызвал PartitionData .

via() Должно быть возвращено то, что вы хотите, чтобы в конечном итоге было записано в качестве содержимого.

Это to() будет основной частью вашего пути.

И в withNaming вас действительно строится заключительная часть вашего пути.

Обычно у меня были бы отметки времени на самих событиях, указывающие, когда они произошли в реальности, тогда вы могли бы получить информацию о событии вместо использования a LocalDateTime.now .