Альтернатива для BufferedOutputStream?

#java #hadoop #apache-spark #bufferedoutputstream

#java #hadoop #apache-spark #bufferedoutputstream

Вопрос:

Это мой фрагмент кода

 @Override
    protected RecordWriter<String, String> getBaseRecordWriter(
            FileSystem fs, JobConf job, String name, Progressable arg3)
                    throws IOException {
        Path file2 = FileOutputFormat.getOutputPath(job);
        String path = file2.toUri().getPath() File.separator  name;
        FSDataOutputStream fileOut = new FSDataOutputStream( new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
        return new LineRecordWriter<String, String>(fileOut, "t");
    }
  

я использую Spark 1.6.1, и в моем коде я использовал saveAsHadoopFile() метод, для которого я пишу класс OutputFormat, производный от org.apache.hadoop.mapred.lib.MultipleTextOutputFormat, и я перезаписываю вышеупомянутый метод.

В кластере он записывает поврежденные записи в выходные файлы. я думаю, это из-за BufferedOutputStream в

 FSDataOutputStream fileOut = new FSDataOutputStream(
                 new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
  

Можем ли мы иметь какую-либо альтернативу для bufferedOutputStream , поскольку он записывает, как только буфер заполняется.

Примечание: обновлен код. Извините за неудобства.

Комментарии:

1. В вашем коде нет BufferedOutputStream , не говоря уже о каких-либо доказательствах вашей уверенности в том, что это вызывает повреждение данных. Неясно, о чем вы спрашиваете, и вероятная проблема XY.

2. Единственное повреждение, которое может вызвать BufferedOutputStream, — это усеченный файл, но только в том случае, если вы не сможете очистить () или закрыть () его.

3. Я обновил код. Я пробовал разные комбинации, поэтому получил неправильную.

4. Тот факт, что BufferedOutputStream «записывается, как только буфер заполняется», не имеет никакого отношения к повреждению данных. Вы все еще не объяснили свои рассуждения.

5. У меня проблема .. в кластере каждый рабочий попытается записать в один и тот же (общий) файл, поскольку оба рабочих на разных машинах означают разные JVM и, следовательно, синхронизированная запись в файл здесь не будет работать. вот почему поврежденные записи. Также я использовал NFS, что является важным фактором. Я действительно хочу получить блокировку записи на уровне файла перед записью в файл. любые указания на это будут полезны

Ответ №1:

У меня проблема .. в кластере каждый работник попытается записать в один и тот же (общий) файл, поскольку оба работника на разных машинах означают разные JVM, и, следовательно, синхронизированная запись в файл здесь не будет работать. вот почему поврежденные записи. Также я использовал NFS, что является важным фактором.