Как десериализовать данные avro из сообщения mqtt?

#python #mqtt #avro

#python #mqtt #avro

Вопрос:

Я получаю сериализованные (AVRO) данные в виде сообщения mqtt. Сообщение выглядит примерно так Objavro.codecnullavro.schemaº{"type": "record", "name": "User", "namespace": "example.avro", "fields": [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, {"type": ["string", "null"], "name": "favorite_color"}]} Œpq ±)žJ@xX·,Alyssa €Ben redŒpq ±)žJ@xX·

Я должен десериализовать эти данные с помощью Python3 с известной схемой user.avsc

 {"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
 

Десериализованные данные должны выглядеть примерно так

 {u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}
 

С примером, приведенным в https://avro.apache.org/docs/current/gettingstartedpython.html данные записываются / считываются из методов DataFileWriter / Reader, однако было бы здорово иметь это «на лету», например, при поступлении сообщения код python десериализует данные и печатает их.

Логика подписки MQTT уже обработана, и на данный момент она просто печатает входящее сообщение, я хотел бы распечатать десериализованные данные с входящим сообщением.

Я попробовал следующее (логика десериализации):

 import avro.schema
from avro.io import DatumReader, DatumWriter
import io

schema = avro.schema.parse(open("user.avsc", "rb").read())
# message passed here is incoming message
bytes_reader = io.BytesIO(bytes(message, encoding='utf-8'))
decoder = avro.io.BinaryDecoder(bytes_reader)

reader = avro.io.DatumReader(schema)
data = reader.read(decoder)
print(data)
 

Приведенный выше код завершается с ошибкой (TypeError: ord() ожидал символ, но найдена строка длиной 0), поскольку я не смог определить правильный формат для использования в качестве аргумента для метода reader.read() . Причина, по которой я использовал ввод-вывод.BytesIO, поскольку данные поступают в виде строки, я не могу передать строку, и очевидно, что пример со страницы apache считывает данные в двоичном формате и использует их для десериализации.

Спасибо

Ответ №1:

Если сообщение, которое вы получаете от MQTT, имеет строковый формат (а не байты), то вы, вероятно, не сможете его десериализовать. Если вы видите двоичный файл avro в строковом формате, вы не сможете просто закодировать его как UTF-8 и десериализовать. Вам нужен фактический двоичный файл.