#python #mqtt #avro
#python #mqtt #avro
Вопрос:
Я получаю сериализованные (AVRO) данные в виде сообщения mqtt. Сообщение выглядит примерно так Objavro.codecnullavro.schemaº{"type": "record", "name": "User", "namespace": "example.avro", "fields": [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, {"type": ["string", "null"], "name": "favorite_color"}]} Œpq ±)žJ@xX·,Alyssa €Ben redŒpq ±)žJ@xX·
Я должен десериализовать эти данные с помощью Python3 с известной схемой user.avsc —
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Десериализованные данные должны выглядеть примерно так
{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}
С примером, приведенным в https://avro.apache.org/docs/current/gettingstartedpython.html данные записываются / считываются из методов DataFileWriter / Reader, однако было бы здорово иметь это «на лету», например, при поступлении сообщения код python десериализует данные и печатает их.
Логика подписки MQTT уже обработана, и на данный момент она просто печатает входящее сообщение, я хотел бы распечатать десериализованные данные с входящим сообщением.
Я попробовал следующее (логика десериализации):
import avro.schema
from avro.io import DatumReader, DatumWriter
import io
schema = avro.schema.parse(open("user.avsc", "rb").read())
# message passed here is incoming message
bytes_reader = io.BytesIO(bytes(message, encoding='utf-8'))
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
data = reader.read(decoder)
print(data)
Приведенный выше код завершается с ошибкой (TypeError: ord() ожидал символ, но найдена строка длиной 0), поскольку я не смог определить правильный формат для использования в качестве аргумента для метода reader.read() . Причина, по которой я использовал ввод-вывод.BytesIO, поскольку данные поступают в виде строки, я не могу передать строку, и очевидно, что пример со страницы apache считывает данные в двоичном формате и использует их для десериализации.
Спасибо
Ответ №1:
Если сообщение, которое вы получаете от MQTT, имеет строковый формат (а не байты), то вы, вероятно, не сможете его десериализовать. Если вы видите двоичный файл avro в строковом формате, вы не сможете просто закодировать его как UTF-8 и десериализовать. Вам нужен фактический двоичный файл.