Обработайте файл по частям в Apache Beam и ошибка в памяти

# #google-cloud-platform #apache-beam

Вопрос:

У нас есть несколько файлов в облачном хранилище.Мы читаем файл из расположения и в зависимости от типа файла обрабатываем файл и записываем в тему вывода .Тема записи в вывод зависит от типа файла.Вот код

     PCollection<FileIO.ReadableFile> data = pipeline.
                        apply(FileIO.match().filepattern(options.getReadDir())
                        .continuously(Duration.standardSeconds(10), Watch.Growth.never()))
                        .apply(FileIO.readMatches().withCompression(Compression.AUTO));
    
   
    
      PCollectionTuple outputData = data.apply(ParDo.of(new Transformer(tupleTagsMap, options.getBlockSize()))
                        .withOutputTags(TupleTags.customerTag, TupleTagList.of(tupleTagList))
                );
    
                outputData.get(TupleTags.topicA)
                        .apply("Write to customer topic",
                                PubsubIO.writeStrings().to(options.topicA));
    
                outputData.get(TupleTags.topicB)
                        .apply("Write to transaction topic",
                                PubsubIO.writeStrings().to(options.topicB));
    
    
      processContext.output(TupleTagsMap().get(importContext.getFileType().toString()), jsonBlock); This block of code is inside the transformer
 

Проблема здесь в том, что файл очень большой и содержит 100 миллионов записей .

Мы добавляем к приведенному выше processContext.вывод фрагментами, но когда он записывается в тему вывода, он записывается, когда вся обработка файла завершена.jsonBlock является одним из фрагментов .

Из-за этого мы получаем ошибку памяти при обработке большого файла .Причина в том, что это не записывается в тему вывода

Как решить эту проблему ?

Комментарии:

1. Каков класс трансформатора?

2. класс трансформатора расширяет DoFn Файл для чтения, строка> и у него есть бизнес-логика для чтения файла, преобразуйте его в строку, затем преобразуйте каждую строку в элемент массива и бизнес-обработки, а затем добавьте обработанный в блок, который является jasonblock, и добавьте в processContext.output(TupleTagsMap().get(importContext.GetFileType().toString ()), jsonBlock);

3. Я думаю, что ключ в методе трансформатора.

4. Я согласен, что трансформатор ПарДо звучит как проблема. Особенно, если вы читаете весь этот файл в память в виде строки, как вы утверждаете. Является ли эта логика чем-то, что вы могли бы реализовать как разделяемый DoFn? Или, по крайней мере, отрегулируйте его так, чтобы вы выводили элементы по мере чтения файла, а не загружали весь файл в память?

5. Я работаю над концепцией расщепляемого DoFn, так как до сих пор мы фокусировались на разделении вывода, но это не решило проблему, теперь мы изменили фокус на ввод . Надеюсь, если это сработает