# #google-cloud-platform #apache-beam
Вопрос:
У нас есть несколько файлов в облачном хранилище.Мы читаем файл из расположения и в зависимости от типа файла обрабатываем файл и записываем в тему вывода .Тема записи в вывод зависит от типа файла.Вот код
PCollection<FileIO.ReadableFile> data = pipeline.
apply(FileIO.match().filepattern(options.getReadDir())
.continuously(Duration.standardSeconds(10), Watch.Growth.never()))
.apply(FileIO.readMatches().withCompression(Compression.AUTO));
PCollectionTuple outputData = data.apply(ParDo.of(new Transformer(tupleTagsMap, options.getBlockSize()))
.withOutputTags(TupleTags.customerTag, TupleTagList.of(tupleTagList))
);
outputData.get(TupleTags.topicA)
.apply("Write to customer topic",
PubsubIO.writeStrings().to(options.topicA));
outputData.get(TupleTags.topicB)
.apply("Write to transaction topic",
PubsubIO.writeStrings().to(options.topicB));
processContext.output(TupleTagsMap().get(importContext.getFileType().toString()), jsonBlock); This block of code is inside the transformer
Проблема здесь в том, что файл очень большой и содержит 100 миллионов записей .
Мы добавляем к приведенному выше processContext.вывод фрагментами, но когда он записывается в тему вывода, он записывается, когда вся обработка файла завершена.jsonBlock является одним из фрагментов .
Из-за этого мы получаем ошибку памяти при обработке большого файла .Причина в том, что это не записывается в тему вывода
Как решить эту проблему ?
Комментарии:
1. Каков класс трансформатора?
2. класс трансформатора расширяет DoFn Файл для чтения, строка> и у него есть бизнес-логика для чтения файла, преобразуйте его в строку, затем преобразуйте каждую строку в элемент массива и бизнес-обработки, а затем добавьте обработанный в блок, который является jasonblock, и добавьте в processContext.output(TupleTagsMap().get(importContext.GetFileType().toString ()), jsonBlock);
3. Я думаю, что ключ в методе трансформатора.
4. Я согласен, что трансформатор ПарДо звучит как проблема. Особенно, если вы читаете весь этот файл в память в виде строки, как вы утверждаете. Является ли эта логика чем-то, что вы могли бы реализовать как разделяемый DoFn? Или, по крайней мере, отрегулируйте его так, чтобы вы выводили элементы по мере чтения файла, а не загружали весь файл в память?
5. Я работаю над концепцией расщепляемого DoFn, так как до сих пор мы фокусировались на разделении вывода, но это не решило проблему, теперь мы изменили фокус на ввод . Надеюсь, если это сработает