Объект, не сериализуемый (класс: org.apache.hadoop.io.Записываемый, значение: 1166)

#java #apache-spark

Вопрос:

Я пытаюсь это сделать:

 private final String charset8859 = "ISO-8859-1";
private final String charsetUtf8 = "UTF-8";
private String partnerFile8859 = "src/test/resources/D10410.QUALSCSV";

public SparkSession getOrCreateSparkSession(){
    SparkConf conf = new SparkConf().setAppName("SparkSample").setMaster("local[*]");
    SparkSession sparkSession = SparkSession
            .builder()
            .config(conf)
            .getOrCreate();

    return sparkSession;
}

public void withCharset2(JavaSparkContext context, String location, String charset) throws UnsupportedEncodingException
{
    if (Charset.forName(charset) == DEFAULT_CHARSET) {
        JavaRDD<String> result = context.textFile(location,1);
    } else {
        //val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
        ////return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
        // can't pass a Charset object here cause its not serializable
        // TODO: maybe use mapPartitions instead?
        JavaPairRDD<LongWritable,Text> rdd = context.hadoopFile(location, TextInputFormat.class, LongWritable.class, Text.class);

        rdd.map(pair -> {
            String s = new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
            return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
        });
        rdd.collect();
    }
}

@Test
public void getTextWithCharset() throws UnsupportedEncodingException, FileNotFoundException {
    SparkSession sparkSession = getOrCreateSparkSession();
    JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());

    withCharset2(sparkContext, partnerFile8859, charset8859);
}
 

но выбрасывает эту ошибку:

ДИСПЕТЧЕР задач ОШИБОК: задача 1.0 на этапе 0.0 (TID 1) привела к несериализуемому результату: стек сериализации org.apache.hadoop.io.LongWritable: — объект, не сериализуемый (класс: org.apache.hadoop.io.LongWritable, значение: 1166) — поле (класс: scala.Tuple2, имя: _1, тип: класс java.lang.Объект) — объект (класс scala.Tuple2, (1166,»53″;»S?cio sem Capital»)) — элемент массива (индекс: 0) — массив (класс [Lscala.Tuple2;, размер 44); не повторяется

Я пытаюсь перенести пример из https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/TextFile.scala к API Java Spark

Ответ №1:

Я исправляю это, делая это:

     public void withCharset2(JavaSparkContext context, String location, String charset) throws UnsupportedEncodingException
{
    if (Charset.forName(charset) == DEFAULT_CHARSET) {
        JavaRDD<String> result = context.textFile(location,1);
    } else {
        //val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
        ////return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
        // can't pass a Charset object here cause its not serializable
        // TODO: maybe use mapPartitions instead?
        JavaPairRDD<LongWritable,Text> rdd = context.hadoopFile(location, TextInputFormat.class, LongWritable.class, Text.class);

        JavaRDD<Text> values = rdd.values();
        JavaRDD<String> textRDD = values.map(text -> {
            String s = new String(text.getBytes(), 0, text.getLength(), charset);
            System.out.printf("textRDD map: %s | orignal string: %s n",s, text.toString());
            //textRDD map: "08";"Conselheiro de Administração" | orignal string: "08";"Conselheiro de Administra??o"
            return s;
        });

        textRDD.collect();
    }
}