#java #hadoop #apache-spark #bufferedoutputstream
#java #hadoop #apache-spark #bufferedoutputstream
Вопрос:
Это мой фрагмент кода
@Override
protected RecordWriter<String, String> getBaseRecordWriter(
FileSystem fs, JobConf job, String name, Progressable arg3)
throws IOException {
Path file2 = FileOutputFormat.getOutputPath(job);
String path = file2.toUri().getPath() File.separator name;
FSDataOutputStream fileOut = new FSDataOutputStream( new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
return new LineRecordWriter<String, String>(fileOut, "t");
}
я использую Spark 1.6.1, и в моем коде я использовал saveAsHadoopFile()
метод, для которого я пишу класс OutputFormat, производный от org.apache.hadoop.mapred.lib.MultipleTextOutputFormat, и я перезаписываю вышеупомянутый метод.
В кластере он записывает поврежденные записи в выходные файлы. я думаю, это из-за BufferedOutputStream
в
FSDataOutputStream fileOut = new FSDataOutputStream(
new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
Можем ли мы иметь какую-либо альтернативу для bufferedOutputStream
, поскольку он записывает, как только буфер заполняется.
Примечание: обновлен код. Извините за неудобства.
Комментарии:
1. В вашем коде нет
BufferedOutputStream
, не говоря уже о каких-либо доказательствах вашей уверенности в том, что это вызывает повреждение данных. Неясно, о чем вы спрашиваете, и вероятная проблема XY.2. Единственное повреждение, которое может вызвать BufferedOutputStream, — это усеченный файл, но только в том случае, если вы не сможете очистить () или закрыть () его.
3. Я обновил код. Я пробовал разные комбинации, поэтому получил неправильную.
4. Тот факт, что
BufferedOutputStream
«записывается, как только буфер заполняется», не имеет никакого отношения к повреждению данных. Вы все еще не объяснили свои рассуждения.5. У меня проблема .. в кластере каждый рабочий попытается записать в один и тот же (общий) файл, поскольку оба рабочих на разных машинах означают разные JVM и, следовательно, синхронизированная запись в файл здесь не будет работать. вот почему поврежденные записи. Также я использовал NFS, что является важным фактором. Я действительно хочу получить блокировку записи на уровне файла перед записью в файл. любые указания на это будут полезны
Ответ №1:
У меня проблема .. в кластере каждый работник попытается записать в один и тот же (общий) файл, поскольку оба работника на разных машинах означают разные JVM, и, следовательно, синхронизированная запись в файл здесь не будет работать. вот почему поврежденные записи. Также я использовал NFS, что является важным фактором.