Формат сообщения Confluent Kafka producer для вложенных записей

#apache-kafka #avro #confluent-platform #confluent-schema-registry #confluent-kafka-python

#apache-kafka #avro #confluent-платформа #confluent-схема-реестр #confluent-kafka-python

Вопрос:

У меня есть схема AVRO, зарегистрированная в теме kafka, и я пытаюсь отправить в нее данные. В схеме есть вложенные записи, и я не уверен, как я правильно отправляю в нее данные, используя confluent_kafka python.

Пример схемы: * удалите все опечатки в схеме (реальная опечатка очень большая, просто пример)

  {
 "namespace": "company__name",
 "name": "our_data",
 "type": "record",
 "fields": [
           {
            "name": "datatype1",
            "type": ["null", {
                 "type": "record",
                 "name": "datatype1_1",
                 "fields": [ 
                     {"name": "site", "type": "string"},
                     {"name": "units", "type": "string"}
                  ]
             }]
             "default": null
            }
            {
            "name": "datatype2",
            "type": ["null", {
                 "type": "record",
                 "name": "datatype2_1",
                 "fields": [ 
                     {"name": "site", "type": "string"},
                     {"name": "units", "type": "string"}
                  ]
             }]
             "default": null
            }
           ]
          }
 

Я пытаюсь отправить данные в эту схему, используя confluent_kafka версию python. Когда я делал это ранее, записи не были вложенными, и я бы использовал типичные словарные key: value пары и сериализовал их. Как я могу отправить вложенные данные для работы со схемой.

Что я пробовал до сих пор…

 message = {'datatype1': 
            {'site': 'sitename',
             'units': 'm'
            }
           }
 

эта версия не вызывает никаких ошибок kafka, но все столбцы отображаются как null

и…

 message = {'datatype1': 
            {'datatype1_1':
              {'site': 'sitename',
               'units': 'm'
              }
            }
           }
 

Эта версия привела к ошибке kafka со схемой.

Комментарии:

1. У вас отсутствуют кавычки в datatype1 и datatype1_1 в вашем Python dict, так что это недопустимый код, но в чем была ошибка из реестра или Кафки?

2. Извините, это была просто опечатка при написании вопроса. Просто исправлено в вопросе. Ошибка кафки{code=_VALUE_SERIALIZATION,val=-161,str=»{‘datatype1_1’:{‘site’: ‘sitename’, ‘units’: ‘m’}» (тип <класс ‘dict’>) не соответствует (схеме)

3. Это JSON или двоичный файл?

4. И вы пробовали, работает ли это, когда datatype1_1 не обнуляется?

Ответ №1:

Если вы используете пространства имен, вам не нужно беспокоиться о коллизиях имен, и вы можете правильно структурировать свои необязательные записи: например, обе

 {
  "meta": {
    "instanceID": "something"
  }
}
 

И

 {}
 

являются допустимыми экземплярами:

 {
  "doc": "Survey",
  "name": "Survey",
  "type": "record",
  "fields": [
    {
      "name": "meta",
      "type": [
        "null",
        {
          "name": "meta",
          "type": "record",
          "fields": [
            {
              "name": "instanceID",
              "type": [
                "null",
                "string"
              ],
              "namespace": "Survey.meta"
            }
          ],
          "namespace": "Survey"
        }
      ],
      "namespace": "Survey"
    }
  ]
}