org.apache.avro.Исключение AvroTypeException: неизвестный идентификатор события объединенной ветви

#apache-kafka #avro #confluent-platform #confluent-schema-registry #confluent-cloud

#apache-kafka #avro #confluent-платформа #слияние-схема-реестр #слияние-облако

Вопрос:

Я пытаюсь преобразовать json в avro с помощью ‘kafka-avro-console-producer’ и опубликовать его в теме kafka.

Я могу сделать это в формате json / schema, но для приведенной ниже схемы и json я получаю «org.apache.avro.Ошибка AvroTypeException: неизвестный идентификатор события объединенной ветви».

Буду признателен за любую помощь.

Схема :

 {
    "type": "record",
    "name": "Envelope",
    "namespace": "CoreOLTPEvents.dbo.Event",
    "fields": [{
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "Value",
            "fields": [{
                "name": "EventId",
                "type": "long"
            }, {
                "name": "CameraId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "SiteId",
                "type": ["null", "long"],
                "default": null
            }],
            "connect.name": "CoreOLTPEvents.dbo.Event.Value"
        }],
        "default": null
    }, {
        "name": "after",
        "type": ["null", "Value"],
        "default": null
    }, {
        "name": "op",
        "type": "string"
    }, {
        "name": "ts_ms",
        "type": ["null", "long"],
        "default": null
    }],
    "connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
 

И ввод Json выглядит следующим образом :

 {
    "before": null,
    "after": {
        "EventId": 12,
        "CameraId": 10,
        "SiteId": 11974
    },
    "op": "C",
    "ts_ms": null
}
 

И в моем случае я не могу изменить схему, я могу изменить только json таким образом, чтобы он работал

Ответ №1:

Если вы используете формат Avro JSON, ввод, который у вас есть, немного искажен. Для объединений необходимо указать ненулевые значения, чтобы была указана информация о типе: https://avro.apache.org/docs/current/spec.html#json_encoding

Смотрите Ниже пример, который, я думаю, должен работать.

 {
    "before": null,
    "after": {
        "CoreOLTPEvents.dbo.Event.Value": {
            "EventId": 12,
            "CameraId": {
                "long": 10
            },
            "SiteId": {
                "long": 11974
            }
        }
    },
    "op": "C",
    "ts_ms": null
}
 

Ответ №2:

Удаление "connect.name": "CoreOLTPEvents.dbo.Event.Value" и "connect.name": "CoreOLTPEvents.dbo.Event.Envelope" как The RecordType can only contains {'namespace', 'aliases', 'fields', 'name', 'type', 'doc'} keys . Не могли бы вы попробовать использовать приведенную ниже схему и посмотреть, сможете ли вы создать сообщение об ошибке?

 {
  "type": "record",
  "name": "Envelope",
  "namespace": "CoreOLTPEvents.dbo.Event",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "EventId",
              "type": "long"
            },
            {
              "name": "CameraId",
              "type": [
                "null",
                "long"
              ],
              "default": "null"
            },
            {
              "name": "SiteId",
              "type": [
                "null",
                "long"
              ],
              "default": "null"
            }
          ]
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ]
}