#scala #serialization #apache-flink #flink-streaming
Вопрос:
Привет, я пытаюсь запустить приложение flink scala, которое читает из кафки, применяет некоторые преобразования поиска, а затем записывает в кафку.
Flink Версия 1.12.1
Я протестировал его на местном, и он отлично работает. Но когда я пытаюсь запустить его в кластере, используя встроенную интеграцию kubernetes, я вижу странные ошибки, как показано ниже.
Кластер также выглядит нормально, потому что я попытался запустить приложение wordcount в кластере, и оно отлично сработало.
Исключение неясно, а также трассировка стека показывает трассировку стека taskmanager и, следовательно, не имеет представления о том, где в приложении может быть проблема. Может ли это быть проблемой сериализации? Есть ли способ отладить такие проблемы и найти фактическую точку в коде приложения, в которой возникла проблема?
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.
at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 8 more
Caused by: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Unknown Source) ~[?:?]
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 8 more```
Ответ №1:
Я предполагал, что проблема была в порядке загрузки классов.
Я пытался развернуть в режиме приложения. До того, как у меня был jar пользовательского кода и зависимые библиотеки в /opt/flink/usrlib. Я попытался следовать этому руководству https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode и использовал /usrlib, чтобы положить его в банки. Я видел ошибку выше.
Теперь я перешел на /opt/flink/lib, а пользовательский код и зависимости с библиотеками flink находятся в одном и том же каталоге внутри образа docker.
Изменено COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/usrlib/poc-flink.jar
на COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/lib/poc-flink.jar
Так, чтобы использовался тот же загрузчик классов. И это решило проблему.