#java #apache-spark
Вопрос:
Я пытаюсь это сделать:
private final String charset8859 = "ISO-8859-1";
private final String charsetUtf8 = "UTF-8";
private String partnerFile8859 = "src/test/resources/D10410.QUALSCSV";
public SparkSession getOrCreateSparkSession(){
SparkConf conf = new SparkConf().setAppName("SparkSample").setMaster("local[*]");
SparkSession sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate();
return sparkSession;
}
public void withCharset2(JavaSparkContext context, String location, String charset) throws UnsupportedEncodingException
{
if (Charset.forName(charset) == DEFAULT_CHARSET) {
JavaRDD<String> result = context.textFile(location,1);
} else {
//val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
////return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
// can't pass a Charset object here cause its not serializable
// TODO: maybe use mapPartitions instead?
JavaPairRDD<LongWritable,Text> rdd = context.hadoopFile(location, TextInputFormat.class, LongWritable.class, Text.class);
rdd.map(pair -> {
String s = new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
});
rdd.collect();
}
}
@Test
public void getTextWithCharset() throws UnsupportedEncodingException, FileNotFoundException {
SparkSession sparkSession = getOrCreateSparkSession();
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
withCharset2(sparkContext, partnerFile8859, charset8859);
}
но выбрасывает эту ошибку:
ДИСПЕТЧЕР задач ОШИБОК: задача 1.0 на этапе 0.0 (TID 1) привела к несериализуемому результату: стек сериализации org.apache.hadoop.io.LongWritable: — объект, не сериализуемый (класс: org.apache.hadoop.io.LongWritable, значение: 1166) — поле (класс: scala.Tuple2, имя: _1, тип: класс java.lang.Объект) — объект (класс scala.Tuple2, (1166,»53″;»S?cio sem Capital»)) — элемент массива (индекс: 0) — массив (класс [Lscala.Tuple2;, размер 44); не повторяется
Я пытаюсь перенести пример из https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/TextFile.scala к API Java Spark
Ответ №1:
Я исправляю это, делая это:
public void withCharset2(JavaSparkContext context, String location, String charset) throws UnsupportedEncodingException
{
if (Charset.forName(charset) == DEFAULT_CHARSET) {
JavaRDD<String> result = context.textFile(location,1);
} else {
//val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
////return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
// can't pass a Charset object here cause its not serializable
// TODO: maybe use mapPartitions instead?
JavaPairRDD<LongWritable,Text> rdd = context.hadoopFile(location, TextInputFormat.class, LongWritable.class, Text.class);
JavaRDD<Text> values = rdd.values();
JavaRDD<String> textRDD = values.map(text -> {
String s = new String(text.getBytes(), 0, text.getLength(), charset);
System.out.printf("textRDD map: %s | orignal string: %s n",s, text.toString());
//textRDD map: "08";"Conselheiro de Administração" | orignal string: "08";"Conselheiro de Administra??o"
return s;
});
textRDD.collect();
}
}