LinkedHashMap изменяется на HashMap и сбой в операторах потока данных flink

#scala #exception #serialization #hashmap #apache-flink

#scala #исключение #сериализация #hashmap #apache-flink

Вопрос:

Учитывая этот фиктивный код:

  1 case class MyObject(values:mutable.LinkedHashMap[String, String])

...

 2    implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
 3    implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
 4
 5    val env = StreamExecutionEnvironment.getExecutionEnvironment
 6
 7    env
 8      .fromElements("one")
 9      .map(str =>
10      {
11        val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12        val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14        obj
15      })
16      .map(obj =>
17      {
18        val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20        obj
21      })
  

Приложение завершит работу в строке 18 с исключением:

 Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap
  

Проблемы, по-видимому, заключаются в том, что посредством сериализации / десериализации values член изменяет свой тип объекта или, другими словами, LinkedHashMap превращается в HashMap .

Обратите внимание, что тот же код, что и в строке 18, отлично работает в строке 12.

При установке точки останова в строке 12, obj.values будет отображаться как LinkedHashMap в отладчике / IntelliJ, однако точка останова в строке 18 будет отображаться obj.values как HashMap в отладчике.

Что здесь происходит? Как я могу это исправить? В конце концов, LinkedHashMap реализует Serializable ?!

Ответ №1:

Сериализатор Kryo Chill по умолчанию для LinkedHashMap не сохраняет тип карты и вместо этого десериализует данные в a HashMap . Чтобы избежать этого, необходимо создать сериализатор для LinkedHashMap типа:

 class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
  override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
    kryo.writeObject(output, `object`.size)

    for (elem <- `object`.iterator) {
      kryo.writeClassAndObject(output, elem._1)
      kryo.writeClassAndObject(output, elem._2)
    }
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
    val result = new mutable.LinkedHashMap[K, V]()
    val size = kryo.readObject(input, classOf[Int])
    for (_ <- 1 to size) {
      val key = kryo.readClassAndObject(input).asInstanceOf[K]
      val value = kryo.readClassAndObject(input).asInstanceOf[V]
      result.put(key, value)
    }

    result
  }
}
  

А затем зарегистрируйте его как Kryo Serializer :

 val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
  

Комментарии:

1. Спасибо за ваш ответ. С пользовательским сериализатором это, похоже, работает. Однако я не уверен, для чего val serializer = new TraversableSerializer ... это нужно? Похоже, он не используется. Кроме того, когда я импортирую org.apache.flink.api.scala.typeutils.TraversableSerializer , он жалуется на false параметр (он ожидает параметр типа TypeSerializer ).

2. Извините, этого не должно было быть. Я исправил второй фрагмент.

3. Хорошо, на самом деле это работает для моего минимального примера, однако он не работает с моим реальным кодом. В фактической реализации LinkedHashMap скрыт в более сложной структуре класса case, и запуск с вашими изменениями теперь вызывает ExceptionInChainedOperatorException вызванный a KryoException — он жалуется на «Невозможно найти класс» (мои задействованные классы case). Означает ли это, что мне придется теперь предоставлять пользовательские сериализаторы для всех моих задействованных классов case и промежуточных классов? Я имею в виду, что это работало до изменений.

4. Сделано несколько более простых тестов, и кажется, что пользовательский сериализатор не может обрабатывать вложенные типы внутри LinkedHashMap . Он выдает, например com.esotericsoftware.kryo.KryoException: Unable to find class: Xscala.None$ , и, похоже, только для любого типа, отличного от POJO.

5. Я Serializer еще раз обновил реализацию, чтобы использовать kryo.writeClassAndObject вместо kryo.writeObject . Надеюсь, теперь это работает.