#scala #exception #serialization #hashmap #apache-flink
#scala #исключение #сериализация #hashmap #apache-flink
Вопрос:
Учитывая этот фиктивный код:
1 case class MyObject(values:mutable.LinkedHashMap[String, String])
...
2 implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
3 implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
4
5 val env = StreamExecutionEnvironment.getExecutionEnvironment
6
7 env
8 .fromElements("one")
9 .map(str =>
10 {
11 val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12 val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14 obj
15 })
16 .map(obj =>
17 {
18 val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20 obj
21 })
Приложение завершит работу в строке 18 с исключением:
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap
Проблемы, по-видимому, заключаются в том, что посредством сериализации / десериализации values
член изменяет свой тип объекта или, другими словами, LinkedHashMap
превращается в HashMap
.
Обратите внимание, что тот же код, что и в строке 18, отлично работает в строке 12.
При установке точки останова в строке 12, obj.values
будет отображаться как LinkedHashMap
в отладчике / IntelliJ, однако точка останова в строке 18 будет отображаться obj.values
как HashMap
в отладчике.
Что здесь происходит? Как я могу это исправить? В конце концов, LinkedHashMap
реализует Serializable
?!
Ответ №1:
Сериализатор Kryo Chill по умолчанию для LinkedHashMap
не сохраняет тип карты и вместо этого десериализует данные в a HashMap
. Чтобы избежать этого, необходимо создать сериализатор для LinkedHashMap
типа:
class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
kryo.writeObject(output, `object`.size)
for (elem <- `object`.iterator) {
kryo.writeClassAndObject(output, elem._1)
kryo.writeClassAndObject(output, elem._2)
}
}
override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
val result = new mutable.LinkedHashMap[K, V]()
val size = kryo.readObject(input, classOf[Int])
for (_ <- 1 to size) {
val key = kryo.readClassAndObject(input).asInstanceOf[K]
val value = kryo.readClassAndObject(input).asInstanceOf[V]
result.put(key, value)
}
result
}
}
А затем зарегистрируйте его как Kryo Serializer
:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
Комментарии:
1. Спасибо за ваш ответ. С пользовательским сериализатором это, похоже, работает. Однако я не уверен, для чего
val serializer = new TraversableSerializer ...
это нужно? Похоже, он не используется. Кроме того, когда я импортируюorg.apache.flink.api.scala.typeutils.TraversableSerializer
, он жалуется наfalse
параметр (он ожидает параметр типаTypeSerializer
).2. Извините, этого не должно было быть. Я исправил второй фрагмент.
3. Хорошо, на самом деле это работает для моего минимального примера, однако он не работает с моим реальным кодом. В фактической реализации
LinkedHashMap
скрыт в более сложной структуре класса case, и запуск с вашими изменениями теперь вызываетExceptionInChainedOperatorException
вызванный aKryoException
— он жалуется на «Невозможно найти класс» (мои задействованные классы case). Означает ли это, что мне придется теперь предоставлять пользовательские сериализаторы для всех моих задействованных классов case и промежуточных классов? Я имею в виду, что это работало до изменений.4. Сделано несколько более простых тестов, и кажется, что пользовательский сериализатор не может обрабатывать вложенные типы внутри
LinkedHashMap
. Он выдает, напримерcom.esotericsoftware.kryo.KryoException: Unable to find class: Xscala.None$
, и, похоже, только для любого типа, отличного от POJO.5. Я
Serializer
еще раз обновил реализацию, чтобы использоватьkryo.writeClassAndObject
вместоkryo.writeObject
. Надеюсь, теперь это работает.