Как использовать самоописывающееся сообщение для protbuf

#go #apache-kafka #protocol-buffers #sarama

#Вперед #apache-kafka #протокол-буферы #сарама

Вопрос:

Одним из вариантов использования, над которым я работаю при использовании буферов протокола, является десериализация сообщений Kafka буферов протокола, которые я получаю на стороне потребителя (с использованием библиотеки sarama и Go).

То, как я сейчас делаю, заключается в том, что я определил образец файла pixel.proto, как показано ниже.

 syntax = "proto3";

package saramaprotobuf;

message Pixel {
  // Session identifier stuff
  string session_id = 2;
}
  

я отправляю сообщение через sarama.Производитель (путем его сортировки) получает его sarama.Потребитель (отменяет отправку сообщения, ссылаясь на соответствующий pixel.proto.pb). Код, как показано ниже.

 import (
    "github.com/Shopify/sarama"
    "github.com/golang/protobuf/proto"
    "log"
    "os"
    "os/signal"
    "protobuftest/example"
    "syscall"
    "time"
)

func main() {
    topic := "test_topic"
    brokerList := []string{"localhost:9092"}

    producer, err := newSyncProducer(brokerList)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer:", err)
    }

    go func() {
        ticker := time.NewTicker(time.Second)
        for {
            select {
            case t := <-ticker.C:
                elliot := amp;example.Pixel{
                    SessionId: t.String(),
                }
                pixelToSend :=  elliot
                pixelToSendBytes, err := proto.Marshal(pixelToSend)
                if err != nil {
                    log.Fatalln("Failed to marshal example:", err)
                }

                msg := amp;sarama.ProducerMessage{
                    Topic: topic,
                    Value: sarama.ByteEncoder(pixelToSendBytes),
                }

                producer.SendMessage(msg)
                log.Printf("Pixel sent: %s", pixelToSend)
            }
        }

    }()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

    partitionConsumer, err := newPartitionConsumer(brokerList, topic)
    if err != nil {
        log.Fatalln("Failed to create Sarama partition consumer:", err)
    }

    log.Println("Waiting for messages...")

    for {
        select {
        case msg := <-partitionConsumer.Messages():
            receivedPixel := amp;example.Pixel{}
            err := proto.Unmarshal(msg.Value, receivedPixel)
            if err != nil {
                log.Fatalln("Failed to unmarshal example:", err)
            }

            log.Printf("Pixel received: %s", receivedPixel)
        case <-signals:
            log.Print("Received termination signal. Exiting.")
            return
        }
    }
}

func newSyncProducer(brokerList []string) (sarama.SyncProducer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true
    // TODO configure producer

    producer, err := sarama.NewSyncProducer(brokerList, config)
    if err != nil {
        return nil, err
    }

    return producer, nil
}

func newPartitionConsumer(brokerList []string, topic string) (sarama.PartitionConsumer, error) {
    conf := sarama.NewConfig()
    // TODO configure consumer
    consumer, err := sarama.NewConsumer(brokerList, conf)
    if err != nil {
        return nil, err
    }

    partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
    if err != nil {
        return nil, err
    }

    return partitionConsumer, err
}
  

В коде, как вы можете видеть, я импортировал файл .proto и ссылался на него в основной функции, чтобы отправлять и получать сообщения. Проблема здесь в том, что решение не является универсальным. Я получу сообщение другого типа .proto на стороне потребителя.

Как я могу сделать его универсальным? Я знаю, что есть нечто, называемое самоописывающимся сообщением (динамическое сообщение) как часть protobuf. Я ссылался на эту ссылку https://developers.google.com/protocol-buffers/docs/techniques?csw=1#self-description . Но в нем нет никаких объяснений о том, как встроить это как часть pixel.proto(пример, который я использовал), чтобы на стороне потребителя я напрямую десериализовал его до требуемого типа.

Ответ №1:

Вы должны определить общий тип сообщения контейнера, который будет включать набор описаний и поля Any.

При отправке вы создаете экземпляр этого общего типа сообщения, устанавливая поле типа Any с экземпляром вашего пиксельного сообщения и устанавливая поле DescriptorSet с набором описаний типа Pixel.

Это позволило бы получателю такого сообщения анализировать любое содержимое, используя присоединяемый вами набор описаний. На практике это отправка части определения прототипа вместе с сообщением. Таким образом, получателям не понадобятся предварительно разделяемые определения прототипов или сгенерированный код.

Сказав это, я не уверен, что это то, чего вы действительно хотите, потому что, если вы планируете делиться прототипными определениями или сгенерированным кодом с клиентами, я бы предложил просто использовать поле oneof в типе контейнера, было бы намного проще в использовании.

Комментарии:

1. Спасибо. @Pablo Да, это то, что я хочу. Не могли бы вы привести мне пример для этого. Это действительно помогло бы. Я не понимаю, как определить DescriptorSet, как вы упомянули

2. не могли бы вы опубликовать пример. Я застрял в присвоении значения DescriptorSet и любому сообщению

3. @coders я был бы рад. У вас есть где-нибудь базовый код, чтобы его изменить?

4. конечно, позвольте мне загрузить его и поделиться с вами ссылкой

5. пожалуйста, найдите ссылку на github для кода — github.com/Sharathnasa/protobuftest . Вы можете редактировать здесь. Пожалуйста, дайте мне знать, если что-то непонятно