Ошибка декодирования сообщения Protobuf с использованием синтаксического анализа kafka-python

#deserialization #protocol-buffers #kafka-python

Вопрос:

У меня есть модель protobuf, как показано ниже: т. Е. protoObj

 sample {
  barcode: "BAR000002"
  requisition_id: "ORDER-1"
  patient_id: "P00002"
  cancer_type: "CRC"
}
assay {
  key: "lunar2"
  title: "Lunar2"
  version: "3.0"
}
lunar2 {
  type: REPORT_TYPE_COMPLETED
  version: 1
  limitations: "TBD"
  disclaimers: "TBD"
  header {
    test_name: "XYZ"
    manufacturer: "ABCD"
    sofware_verision: "TBD"
    regulatory_status: "IVD"
  }
  content {
    requisition_id: "ORDER-1"
    created {
      seconds: 1631736582
      nanos: 986649036
    }
    result: "ctDNA DETECTED"
  }
}
 

Теперь я отправляю это продюсеру Кафки, как показано ниже

 kafkaProducer.send('topic-test', protoObj.SerializeToString())
 

Теперь, на стороне потребителя, я хочу удалить сериализацию в JSON и отправить этот JSON в нижестоящие службы, что я делаю ниже:

  def consumerMessage(self):
        for msg in self.kafkaConsumer:
            prototype_pb2.Proto().ParseFromString(msg.value)
            print(prototype_pb2)
 

Но я получаю следующую ошибку:

 google.protobuf.message.DecodeError: Error parsing message
 

Я не понимаю, в чем именно заключается проблема. Пожалуйста, помогите.

Комментарии:

1. У объекта сообщения есть value метод, который вы использовали msg.value , а не msg.value() . Если вы внесете это изменение и повторите попытку, что вы увидите?

2. Спасибо! но у меня была другая реализация, в которой я удаляю сериализацию и преобразую данные в JSON с помощью MessageToJson()