# #java #google-cloud-storage #google-cloud-dataflow #apache-beam
Вопрос:
Мы создаем конвейер Apache Beam (Java SDK), который записывает данные в облачное хранилище. Мы используем TextIO.write()
преобразование для записи в хранилище. В рамках этой операции мы хотели бы динамически изменять подкаталог, в котором хранится файл, в зависимости от текущей даты и времени.
Это часть потоковой передачи. В идеале мы хотели бы развернуть его и позволить заданию Beam динамически изменять подкаталог папки, в которой он сохраняет файл, в зависимости от даты и времени обработки.
Наше текущее преобразование трубопровода выглядит следующим образом:
DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));
Проблема с этим кодом заключается в том, что значение даты и времени, возвращаемое функцией, застревает на том же значении, что и при развертывании конвейера в потоке данных Google Cloud. Мы хотели бы динамически изменять структуру подкаталогов в зависимости от даты и времени во время обработки входящего сообщения.
Я собирался:
- Получите дату — время из функции ParDo.
- Создайте новую функцию ParDo, используйте сообщение в качестве основного ввода и передайте дату-время из другой функции ParDo в качестве бокового ввода.
Является ли это лучшим подходом? Существуют ли встроенные инструменты в Apache Beam, которые могут решить наш вариант использования?
Комментарии:
1. Это может помочь вам: beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/…
2. Спасибо за ваш ответ. Я все еще не понимаю, как я могу использовать класс ValueProvider.RuntimeValueProvider для создания пути к подкаталогу во время выполнения. Не могли бы вы, пожалуйста, сказать мне, как я могу включить
dftOut.print(...)
метод в класс ValueProvider?
Ответ №1:
FileIO предоставляет метод writeDynamic (), который позволяет направлять каждый элемент коллекции PC в другой каталог или файл с учетом содержимого самого элемента.
Я помещаю ниже простой пример, который я создал просто для демонстрации:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline pipeline = Pipeline.create(options);
Create.Values<String> sampleData = Create.of("ABC","DEF", "GHI");
pipeline.apply(sampleData)
.apply("WritingDynamic", FileIO.<PartitionData, String>writeDynamic()
.by(event -> new PartitionData())
.withDestinationCoder(AvroCoder.of(PartitionData.class))
.via(Contextful.fn(event -> event), TextIO.sink())
.to("gs://my-bucket/")
.withNaming(partitionData -> FileIO.Write.defaultNaming(partitionData.getPath(), ".txt")));
pipeline.run().waitUntilFinish();
}
public static class PartitionData implements Serializable {
private static final long serialVersionUID = 4549174774076944665L;
public String getPath() {
LocalDateTime writtingMoment = LocalDateTime.now(ZoneOffset.UTC);
int year = writtingMoment.getYear();
int month = writtingMoment.getMonthValue();
int day = writtingMoment.getDayOfMonth();
return String.format("%d/d/d/", year, month, day);
}
}
Приведенный выше код будет сохранен в структуру:
gs://my-bucket/${year}/%{month}/%{day}/... .txt
В by()
методе я использовал свой разделитель данных, который я вызвал PartitionData
.
via()
Должно быть возвращено то, что вы хотите, чтобы в конечном итоге было записано в качестве содержимого.
Это to()
будет основной частью вашего пути.
И в withNaming
вас действительно строится заключительная часть вашего пути.
Обычно у меня были бы отметки времени на самих событиях, указывающие, когда они произошли в реальности, тогда вы могли бы получить информацию о событии вместо использования a LocalDateTime.now
.