#java #google-cloud-platform #google-cloud-storage #google-cloud-dataflow
#java #google-облачная платформа #google-облачное хранилище #google-облако-поток данных
Вопрос:
Я передаю файлы в хранилище GCP (bucket). Это приводит к частой ошибке (примерно 2 миллиона раз в день), утверждающей, что моя политика имен файлов должна генерировать уникальное имя. Я пробовал несколько способов гарантировать уникальное имя, таких как использование currentTimeMillis, currentThread, шифрование имени файла и т.д… Все они, похоже, устраняют ошибку на пару часов / день, прежде чем она возвращается с той же частотой. Я не сталкивался с ситуацией, когда файл отсутствовал, но я также не проводил тщательного поиска. Это мой автор:
pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")))
.apply(new GenericFunctions.extractMsg())
.apply(FileIO.<String, String>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix()))
.via(TextIO.sink())
.withNumShards(options.getNumShards())
.to(options.getOutputDirectory())
.withNaming(type -> FileIO.Write.defaultNaming(type, ".txt"))
.withDestinationCoder(StringUtf8Coder.of()));
Я могу подробнее рассказать о любых методах, отличных от Google, которые мы используем, если это необходимо. Это следствие ошибки, возвращаемой StackDriver:
exception: "java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Filename policy must generate unique filenames, but generated the same name gs://myBucket/myProject/2019/01/21/13h/myProject-2123065519-2019-01-21T12:58:00.000Z-2019-01-21T13:00:00.000Z-00000-of-00001.txt for file results FileResult{tempFilename=gs://myBucket/myProject/.temp-beam-2019-01-17_20-30-37-1/2e2c99ef-6508-4f10-ada4-e5c108b1d884, shard=0, window=[2019-01-21T12:58:00.000Z..2019-01-21T13:00:00.000Z), paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} and FileResult{tempFilename=gs://myBucket/myProject/.temp-beam-2019-01-17_20-30-37-1/2112c27f-8e0c-4831-b3fe-fbefe9ed560e, shard=0, window=[2019-01-21T12:58:00.000Z..2019-01-21T13:00:00.000Z), paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}}
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:58)
at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:40)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:135)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:45)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:50)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:202)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:160)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1226)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:141)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:965)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Filename policy must generate unique filenames, but generated the same name gs://myBucket/myProject/2019/01/21/13h/myProject-2123065519-2019-01-21T12:58:00.000Z-2019-01-21T13:00:00.000Z-00000-of-00001.txt for file results FileResult{tempFilename=gs://myBucket/myProject/.temp-beam-2019-01-17_20-30-37-1/2e2c99ef-6508-4f10-ada4-e5c108b1d884, shard=0, window=[2019-01-21T12:58:00.000Z..2019-01-21T13:00:00.000Z), paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} and FileResult{tempFilename=gs://myBucket/myProject/.temp-beam-2019-01-17_20-30-37-1/2112c27f-8e0c-4831-b3fe-fbefe9ed560e, shard=0, window=[2019-01-21T12:58:00.000Z..2019-01-21T13:00:00.000Z), paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}}
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:326)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:45)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:50)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:273)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:128)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:326)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:45)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:50)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:273)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:609)
at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:85)
at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:326)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:45)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:50)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:183)
... 17 more
Caused by: java.lang.IllegalArgumentException: Filename policy must generate unique filenames, but generated the same name gs://myBucket/myProject/2019/01/21/13h/myProject-2123065519-2019-01-21T12:58:00.000Z-2019-01-21T13:00:00.000Z-00000-of-00001.txt for file results FileResult{tempFilename=gs://myBucket/myProject/.temp-beam-2019-01-17_20-30-37-1/2e2c99ef-6508-4f10-ada4-e5c108b1d884, shard=0, window=[2019-01-21T12:58:00.000Z..2019-01-21T13:00:00.000Z), paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} and FileResult{tempFilename=gs://myBucket/myProject/.temp-beam-2019-01-17_20-30-37-1/2112c27f-8e0c-4831-b3fe-fbefe9ed560e, shard=0, window=[2019-01-21T12:58:00.000Z..2019-01-21T13:00:00.000Z), paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}}
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:399)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalizeDestination(FileBasedSink.java:656)
at org.apache.beam.sdk.io.WriteFiles.finalizeAllDestinations(WriteFiles.java:819)
at org.apache.beam.sdk.io.WriteFiles.access$1600(WriteFiles.java:112)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:796)
"
Ответ №1:
Я бы использовал методы, которые были созданы для этого. Например, UUID.randomUUID().toString() генерирует UUID типа 4, который может использоваться для присвоения файлам уникальных имен.
Комментарии:
1. Вы также можете добавить к файлу что-то, что вы узнаете, и добавить с рекомендацией stdunbar. Что-то вроде
MyFolder/picture-${DATE}-${UUID}.jpg
2. Я попробую это сделать. При этом, почему мой метод не сработал? Мне трудно представить, что шифрование строки, содержащей threadname и currentTimeMillis, будет часто генерировать одну и ту же строку.
3. Итак, я попробовал, и исключений с повторяющимися именами файлов пока нет. Основываясь на моих предыдущих попытках, они все еще могут отображаться. При этом теперь существует множество ошибок нехватки памяти / места в куче и даже ошибок сборки мусора.
4. Я попробовал еще раз и посмотрел более подробно. Как ни странно, использование этого метода, похоже, решило проблему с дублированием имени файла (по крайней мере временно), но теперь выводит другой файл для каждого отдельного журнала вместо объединения нескольких журналов в один файл, как указано в продолжительности моего окна. Есть идеи относительно того, почему это может произойти?
5. Обновление: похоже, что это устранило ошибку. Я не уверен, было ли это напрямую из-за этого, но я не получал никаких ошибок с дублированием имени файла с тех пор, как я внес это изменение. Я отмечу этот ответ как принятый. При этом, если кто-нибудь знает, не могли бы вы объяснить, с чего началась ошибка?