#scala #apache-flink #avro #flink-streaming
#scala #apache-flink #avro #flink-потоковая передача
Вопрос:
У нас есть задание Flink, написанное на Scala с использованием классов case (сгенерированных из файлов avsc avrohugger) для представления нашего состояния. Мы хотели бы использовать Avro для сериализации нашего состояния, чтобы миграция состояний работала при обновлении наших моделей. Мы поняли, что с Flink 1.7 сериализация Avro поддерживается OOTB. Мы добавили модуль flink-avro в classpath, но при восстановлении из сохраненного снимка мы замечаем, что он все еще пытается использовать сериализацию Kryo. Соответствующий фрагмент кода
case class Foo(id: String, timestamp: java.time.Instant)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val conf = env.getConfig
conf.disableForceKryo()
conf.enableForceAvro()
val rawDataStream: DataStream[String] = env.addSource(MyFlinkKafkaConsumer)
val parsedDataSteam: DataStream[Foo] = rawDataStream.flatMap(new JsonParser[Foo])
// do something useful with it
env.execute("my-job")
При выполнении миграции состояния на Foo
(например, путем добавления поля и развертывания задания) Я вижу, что он пытается десериализовать с помощью Kryo, что, очевидно, не удается. Как я могу убедиться, что используется сериализация Avro?
Обновить
Узнал о https://issues.apache.org/jira/browse/FLINK-10897, таким образом, сериализация состояния POJO с помощью Avro поддерживается только с версии 1.8 afaik. Я попробовал это, используя последнюю версию RC версии 1.8 с простым POJO WordCount, который расширяется из SpecificRecord:
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch
case class WordWithCount(var word: String, var count: Long) extends
org.apache.avro.specific.SpecificRecordBase {
def this() = this("", 0L)
def get(field$: Int): AnyRef = {
(field$: @switch) match {
case 0 => {
word
}.asInstanceOf[AnyRef]
case 1 => {
count
}.asInstanceOf[AnyRef]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
}
def put(field$: Int, value: Any): Unit = {
(field$: @switch) match {
case 0 => this.word = {
value.toString
}.asInstanceOf[String]
case 1 => this.count = {
value
}.asInstanceOf[Long]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
()
}
def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
}
object WordWithCount {
val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(" .
{"type":"record","name":"WordWithCount","fields":
[{"name":"word","type":"string"},
{"name":"count","type":"long"}]}")
}
Это, однако, также не сработало из коробки. Затем мы попытались определить нашу собственную информацию о типе, используя AvroTypeInfo от flink-avro, но это не удается, потому что Avro ищет свойство SCHEMA $ (SpecificData: 285) в классе и не может использовать отражение Java для идентификации SCHEMA $ в сопутствующем объекте Scala.
Ответ №1:
Я никогда не мог заставить reflection работать из-за того, что поля Scala были закрытыми под капотом. AFAIK единственное решение — обновить Flink, чтобы использовать конструкторы avro, не основанные на отражении, в AvroInputFormat (сравнить).
В крайнем случае, кроме Java, можно было бы вернуться к GenericRecord от avro, возможно, использовать avro4s для генерации их из Standard
формата avrohugger (обратите внимание, что Avro4s сгенерирует свою собственную схему из сгенерированных типов Scala)