#apache-kafka #avro #confluent-platform #confluent-schema-registry #confluent-cloud
#apache-kafka #avro #confluent-платформа #слияние-схема-реестр #слияние-облако
Вопрос:
Я пытаюсь преобразовать json в avro с помощью ‘kafka-avro-console-producer’ и опубликовать его в теме kafka.
Я могу сделать это в формате json / schema, но для приведенной ниже схемы и json я получаю «org.apache.avro.Ошибка AvroTypeException: неизвестный идентификатор события объединенной ветви».
Буду признателен за любую помощь.
Схема :
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Value",
"fields": [{
"name": "EventId",
"type": "long"
}, {
"name": "CameraId",
"type": ["null", "long"],
"default": null
}, {
"name": "SiteId",
"type": ["null", "long"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Value"
}],
"default": null
}, {
"name": "after",
"type": ["null", "Value"],
"default": null
}, {
"name": "op",
"type": "string"
}, {
"name": "ts_ms",
"type": ["null", "long"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
И ввод Json выглядит следующим образом :
{
"before": null,
"after": {
"EventId": 12,
"CameraId": 10,
"SiteId": 11974
},
"op": "C",
"ts_ms": null
}
И в моем случае я не могу изменить схему, я могу изменить только json таким образом, чтобы он работал
Ответ №1:
Если вы используете формат Avro JSON, ввод, который у вас есть, немного искажен. Для объединений необходимо указать ненулевые значения, чтобы была указана информация о типе: https://avro.apache.org/docs/current/spec.html#json_encoding
Смотрите Ниже пример, который, я думаю, должен работать.
{
"before": null,
"after": {
"CoreOLTPEvents.dbo.Event.Value": {
"EventId": 12,
"CameraId": {
"long": 10
},
"SiteId": {
"long": 11974
}
}
},
"op": "C",
"ts_ms": null
}
Ответ №2:
Удаление "connect.name": "CoreOLTPEvents.dbo.Event.Value"
и "connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
как The RecordType can only contains {'namespace', 'aliases', 'fields', 'name', 'type', 'doc'} keys
. Не могли бы вы попробовать использовать приведенную ниже схему и посмотреть, сможете ли вы создать сообщение об ошибке?
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "EventId",
"type": "long"
},
{
"name": "CameraId",
"type": [
"null",
"long"
],
"default": "null"
},
{
"name": "SiteId",
"type": [
"null",
"long"
],
"default": "null"
}
]
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
}
]
}