Не удалось десериализовать запись Avro — Apache flink SQL CLI

#apache-kafka #apache-flink #flink-sql

#апачи-кафка #apache-flink #flink-sql

Вопрос:

Я публикую сериализованные данные avro в тему kafka, а затем пытаюсь создать таблицу Flink из темы через интерфейс командной строки SQL. Я могу создать тему, но не могу просмотреть данные темы после выполнения SELECT инструкции SQL. Тем не менее, я могу десериализовать и распечатать опубликованные данные с помощью простого потребителя kafka. Получение этой ошибки в командной строке SQL:

 Flink SQL> SELECT * FROM test_flink2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ArrayIndexOutOfBoundsException: Index -3 out of bounds for length 2
 

Создание таблицы

 Flink SQL> CREATE TABLE test_flink2 (
> `name` STRING,
> `address` STRING)
> WITH (
> 'connector' = 'kafka',
> 'topic' = 'test_flink2',
> 'scan.startup.mode' = 'earliest-offset',
> 'properties.bootstrap.servers' = 'krypton04.psc:9092',
> 'format' = 'avro');
[INFO] Table has been created.
 

Определение таблицы

 Flink SQL> DESC test_flink2;
 --------- -------- ------ ----- -------- ----------- 
|    name |   type | null | key | extras | watermark |
 --------- -------- ------ ----- -------- ----------- 
|    name | STRING | true |     |        |           |
| address | STRING | true |     |        |           |
 --------- -------- ------ ----- -------- ----------- 
2 rows in set
 

Avro schema

 {
  "name": "MyClass",
  "type": "record",
  "namespace": "myns",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "address",
      "type": "string"
    }
  ]
}
 

Значение сообщения (ключ сообщения отсутствует)

     value = {'name' : 'vikram',
            'address' : 'hyd'}
 

Я постоянно отправляю это же значение сообщения, используя простой kafka producer в тему test_flink2

Описание темы Kafka

 Topic: reddyvel_test_flink2 PartitionCount: 1   ReplicationFactor: 3    Configs: 
    Topic: reddyvel_test_flink2 Partition: 0    Leader: 1   Replicas: 1,4,5 Isr: 1,4,5
 

Полный журнал ошибок

От flink-*-sql-client-*.log

 2021-02-05 09:10:01,351 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result.
        at org.apache.flink.table.client.gateway.local.result.CollectStreamResult.lambda$startRetrieval$0(CollectStreamResult.java:96) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:840) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
        at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?]
        at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 583e8c2eb20eb8d8bdedba04673bb297)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 583e8c2eb20eb8d8bdedba04673bb297)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at jdk.internal.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.io.IOException: Failed to deserialize Avro record.
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104) ~[?:?]
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) ~[?:?]
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) ~[?:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -3 out of bounds for length 2
        at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[?:?]
        at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:137) ~[?:?]
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101) ~[?:?]
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) ~[?:?]
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) ~[?:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 

Я не могу выяснить причину проблемы здесь.

Спасибо.

Комментарии:

1. Как были получены эти данные? Если вы используете инструменты Confluent, вам необходимо использовать десериализатор Confluent Avro от Flink

2. @OneCricketeer Я использую confluent kafka python API для отправки сообщений. Использование MessageSerializer для сериализации сообщений ( github.com/confluentinc/confluent-kafka-python/blob /… )

Ответ №1:

использование confluent kafka python API для отправки сообщения

Затем вы должны использовать десериализатор Flink Confluent Avro

Ваша ошибка заключается в том, что вы пытаетесь использовать обычный Avro, который требует, чтобы схема была частью сообщения (он не может ее найти, поэтому выбрасывает массив за пределы)

Комментарии:

1. Но мы не используем реестр confluent schema для загрузки схем. Мы внедрили наш собственный реестр схем на основе файлов, который загружает схемы avro из файловой системы. Таким образом, десериализатор confluent avro может не работать, так как для него требуется URL-адрес реестра confluent schema. Есть какие-нибудь мысли о том, как решить эту проблему?

2. Класс Python, с которым вы связались, требует реестра схемы, в частности, реализации Confluent. Если у вас что-то другое, вам нужно написать соответствующую реализацию в API Flink или встроить схему в сообщение, чтобы стандартные десериализаторы Avro могли ее прочитать