#go #apache-kafka #protocol-buffers #sarama
#Вперед #apache-kafka #протокол-буферы #сарама
Вопрос:
Одним из вариантов использования, над которым я работаю при использовании буферов протокола, является десериализация сообщений Kafka буферов протокола, которые я получаю на стороне потребителя (с использованием библиотеки sarama и Go).
То, как я сейчас делаю, заключается в том, что я определил образец файла pixel.proto, как показано ниже.
syntax = "proto3";
package saramaprotobuf;
message Pixel {
// Session identifier stuff
string session_id = 2;
}
я отправляю сообщение через sarama.Производитель (путем его сортировки) получает его sarama.Потребитель (отменяет отправку сообщения, ссылаясь на соответствующий pixel.proto.pb). Код, как показано ниже.
import (
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"log"
"os"
"os/signal"
"protobuftest/example"
"syscall"
"time"
)
func main() {
topic := "test_topic"
brokerList := []string{"localhost:9092"}
producer, err := newSyncProducer(brokerList)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
go func() {
ticker := time.NewTicker(time.Second)
for {
select {
case t := <-ticker.C:
elliot := amp;example.Pixel{
SessionId: t.String(),
}
pixelToSend := elliot
pixelToSendBytes, err := proto.Marshal(pixelToSend)
if err != nil {
log.Fatalln("Failed to marshal example:", err)
}
msg := amp;sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(pixelToSendBytes),
}
producer.SendMessage(msg)
log.Printf("Pixel sent: %s", pixelToSend)
}
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
partitionConsumer, err := newPartitionConsumer(brokerList, topic)
if err != nil {
log.Fatalln("Failed to create Sarama partition consumer:", err)
}
log.Println("Waiting for messages...")
for {
select {
case msg := <-partitionConsumer.Messages():
receivedPixel := amp;example.Pixel{}
err := proto.Unmarshal(msg.Value, receivedPixel)
if err != nil {
log.Fatalln("Failed to unmarshal example:", err)
}
log.Printf("Pixel received: %s", receivedPixel)
case <-signals:
log.Print("Received termination signal. Exiting.")
return
}
}
}
func newSyncProducer(brokerList []string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
// TODO configure producer
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
return nil, err
}
return producer, nil
}
func newPartitionConsumer(brokerList []string, topic string) (sarama.PartitionConsumer, error) {
conf := sarama.NewConfig()
// TODO configure consumer
consumer, err := sarama.NewConsumer(brokerList, conf)
if err != nil {
return nil, err
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
return nil, err
}
return partitionConsumer, err
}
В коде, как вы можете видеть, я импортировал файл .proto и ссылался на него в основной функции, чтобы отправлять и получать сообщения. Проблема здесь в том, что решение не является универсальным. Я получу сообщение другого типа .proto на стороне потребителя.
Как я могу сделать его универсальным? Я знаю, что есть нечто, называемое самоописывающимся сообщением (динамическое сообщение) как часть protobuf. Я ссылался на эту ссылку https://developers.google.com/protocol-buffers/docs/techniques?csw=1#self-description . Но в нем нет никаких объяснений о том, как встроить это как часть pixel.proto(пример, который я использовал), чтобы на стороне потребителя я напрямую десериализовал его до требуемого типа.
Ответ №1:
Вы должны определить общий тип сообщения контейнера, который будет включать набор описаний и поля Any.
При отправке вы создаете экземпляр этого общего типа сообщения, устанавливая поле типа Any с экземпляром вашего пиксельного сообщения и устанавливая поле DescriptorSet с набором описаний типа Pixel.
Это позволило бы получателю такого сообщения анализировать любое содержимое, используя присоединяемый вами набор описаний. На практике это отправка части определения прототипа вместе с сообщением. Таким образом, получателям не понадобятся предварительно разделяемые определения прототипов или сгенерированный код.
Сказав это, я не уверен, что это то, чего вы действительно хотите, потому что, если вы планируете делиться прототипными определениями или сгенерированным кодом с клиентами, я бы предложил просто использовать поле oneof в типе контейнера, было бы намного проще в использовании.
Комментарии:
1. Спасибо. @Pablo Да, это то, что я хочу. Не могли бы вы привести мне пример для этого. Это действительно помогло бы. Я не понимаю, как определить DescriptorSet, как вы упомянули
2. не могли бы вы опубликовать пример. Я застрял в присвоении значения DescriptorSet и любому сообщению
3. @coders я был бы рад. У вас есть где-нибудь базовый код, чтобы его изменить?
4. конечно, позвольте мне загрузить его и поделиться с вами ссылкой
5. пожалуйста, найдите ссылку на github для кода — github.com/Sharathnasa/protobuftest . Вы можете редактировать здесь. Пожалуйста, дайте мне знать, если что-то непонятно