#deserialization #protocol-buffers #kafka-python
Вопрос:
У меня есть модель protobuf, как показано ниже: т. Е. protoObj
sample {
barcode: "BAR000002"
requisition_id: "ORDER-1"
patient_id: "P00002"
cancer_type: "CRC"
}
assay {
key: "lunar2"
title: "Lunar2"
version: "3.0"
}
lunar2 {
type: REPORT_TYPE_COMPLETED
version: 1
limitations: "TBD"
disclaimers: "TBD"
header {
test_name: "XYZ"
manufacturer: "ABCD"
sofware_verision: "TBD"
regulatory_status: "IVD"
}
content {
requisition_id: "ORDER-1"
created {
seconds: 1631736582
nanos: 986649036
}
result: "ctDNA DETECTED"
}
}
Теперь я отправляю это продюсеру Кафки, как показано ниже
kafkaProducer.send('topic-test', protoObj.SerializeToString())
Теперь, на стороне потребителя, я хочу удалить сериализацию в JSON и отправить этот JSON в нижестоящие службы, что я делаю ниже:
def consumerMessage(self):
for msg in self.kafkaConsumer:
prototype_pb2.Proto().ParseFromString(msg.value)
print(prototype_pb2)
Но я получаю следующую ошибку:
google.protobuf.message.DecodeError: Error parsing message
Я не понимаю, в чем именно заключается проблема. Пожалуйста, помогите.
Комментарии:
1. У объекта сообщения есть
value
метод, который вы использовалиmsg.value
, а неmsg.value()
. Если вы внесете это изменение и повторите попытку, что вы увидите?2. Спасибо! но у меня была другая реализация, в которой я удаляю сериализацию и преобразую данные в JSON с помощью MessageToJson()