Проблема сериализации при использовании Spark3.1.2 с hadoop yarn

#java #apache-spark #hadoop #serialization

Вопрос:

Моя работа spark завершается с этой ошибкой:

Исключение org.apache.spark.SparkException: Задание прервано из-за сбоя этапа: Задание 0 на этапе 0.0 не удалось выполнить 4 раза, последний сбой: Потеряно задание 0.3 на этапе 0.0 (TID 3) (исполнитель davben-lubuntu 2): java.lang.Исключение ClassCastException: не удается назначить экземпляр java.lang.invoke.Сериализованный lambda в поле org.apache.spark.rdd.MapPartitionsRDD.f типа scala.Функция 3 в экземпляре org.apache.spark.rdd.MapPartitionsRDD

Моя ОС Linux Ubuntu 20 организована следующим образом: у меня два пользователя: /home/davben и /home/hadoop. В пользователя hadoop я установил hadoop 3.1 и spark-3.1.2-hadoop3.2. Оба пользователя ссылаются на установку Java-8-openjdk. Задание Spark запускается пользователем davben в среде разработки eclipse следующим образом: я создаю spark conf и сеанс spark

 System.setProperty("hadoop.home.dir", "/home/hadoop/hadoop");
SparkConf sparkConf = new SparkConf()
.setAppName("simple")
.setMaster("yarn")
.set("spark.executor.memory", "1g")
.set("deploy.mode", "cluster")
.set("spark.yarn.stagingDir", "hdfs://localhost:9000/user/hadoop/") .set("spark.hadoop.fs.defaultFS","hdfs://localhost:9000") .set("spark.hadoop.yarn.resourcemanager.hostname","localhost") .set("spark.hadoop.yarn.resourcemanager.scheduler.address","localhost:8030") .set("spark.hadoop.yarn.resourcemanager.address ","localhost:8032") .set("spark.hadoop.yarn.resourcemanager.webapp.address","localhost:8088") .set("spark.hadoop.yarn.resourcemanager.admin.address","localhost:8083")
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
 

Затем я создаю набор данных с двумя записями:

 List<Row> rows = new ArrayList<>(); 
rows.add(RowFactory.create("a", "b"));
rows.add(RowFactory.create("a", "a"));
StructType structType = new StructType(); 
structType = structType.add("edge_1", DataTypes.StringType, false);
structType = structType.add("edge_2", DataTypes.StringType, false); ExpressionEncoder<Row> edgeEncoder = RowEncoder.apply(structType);
Dataset<Row> edge = spark.createDataset(rows, edgeEncoder);
 

Затем я печатаю содержимое текущего края набора данных

  edge.show();
 

Затем я выполняю преобразование карты на краю, которое записывает значения двух записей в верхнем регистре и возвращает результат в edge2

Набор данных edge2 = edge.map(новая функция MyFunction2(), edgeEncoder); Ниже приведен код функции MyFunction2

 public class MyFunction2 implements MapFunction<Row, Row>, scala.Serializable { 
private static final long serialVersionUID = 1L;

@Override public Row call(Row v1) throws Exception { 
String el1 = v1.get(0).toString().toUpperCase(); 
String el2 = v1.get(1).toString().toUpperCase(); 
return RowFactory.create(el1,el2); 
}
}
 

Наконец, я показываю содержимое edge2

 edge2.show();
 

Я могу подтвердить это, проверив пользовательский интерфейс hadoop на локальном хосте:8088, задание отправлено правильно, и
что звучит странно, так это то, что первое шоу возвращается правильно в моей консоли, но второе не возвращает указанную ошибку.