Слияние C#.Десериализация объекта Kafka SetValueDeserializer

#c# #apache-kafka #protobuf-net #confluent-kafka-dotnet

Вопрос:

В моем потребителе я хочу десериализовать сообщение протобуфа Кафки. Ключ имеет строковый тип, но значение сообщения является объектом protobuf. Я знаю, что мне нужно создать свой собственный десериализатор для значения сообщения, но я понятия не имею, как его создать. Вот моя потребительская реализация, в которой мне нужно заменить отмеченную строку:

 using Confluent.Kafka;
using System;
using System.Threading;

namespace EventHubsForKafkaSample
{
    class Worker1
    {
        public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SocketTimeoutMs = 60000,                //this corresponds to the Consumer config `request.timeout.ms`
                SessionTimeoutMs = 30000,
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "$ConnectionString",
                SaslPassword = connStr,
                SslCaLocation = cacertlocation,
                GroupId = consumergroup,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                BrokerVersionFallback = "1.0.0",        //Event Hubs for Kafka Ecosystems supports Kafka v1.0 , a fallback to an older API will fail
                //Debug = "security,broker,protocol"    //Uncomment for librdkafka debugging information
            };

            using (var consumer = new ConsumerBuilder<string, ProtobufMessage>(config)
                .SetKeyDeserializer(Deserializers.Utf8)
                .SetValueDeserializer(Deserializers.Utf8) //<<-----
                .Build())
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress  = (_, e) => { e.Cancel = true; cts.Cancel(); };

                consumer.Subscribe(topic);

                Console.WriteLine("Consuming messages from topic: "   topic   ", broker(s): "   brokerList);

                while (true)
                {
                    try
                    {
                        var msg = consumer.Consume(cts.Token);
                        Console.WriteLine($"Received: '{msg.Value}'");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"Error: {e.Message}");
                    }
                }
            }
        }
    }

    public class ProtobufMessage
    {
        public DateTime timestamp { get; set; }
        public int inputId { get; set; }
        public double? value { get; set; }
        public int sourceId { get; set; }
        public string inputGuid { get; set; }
    }
}
 

Формат сообщения Protobuf:

 syntax = "proto3";

package ileco.chimp.proto;

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option java_package = "ileco.chimp.proto";
option java_outer_classname = "FinalValueProtos";

message FinalValue {
  google.protobuf.Timestamp timestamp = 1;
  uint32 inputId = 2;
  google.protobuf.DoubleValue value = 3;
  uint32 sourceId = 4;
  string inputGuid = 5;
}
 

Комментарии:

1. Обратите внимание, что определение схемы должно быть правильным; см. protogen.marcgravell.com чтобы использовать protobuf-net .proto для создания C#

Ответ №1:

  1. Вам нужно использовать protoc для создания класса C# из схемы
  2. Вам не нужен собственный десериализатор, если вы используете реестр схем. См. Пример кода
      using (var consumer =
         new ConsumerBuilder<string, YourProtoMessage>(consumerConfig)
             .SetValueDeserializer(new ProtobufDeserializer<YourProtoMessage>().AsSyncOverAsync())
     

Если вы не используете реестр схем, вам нужно будет определить свой собственный десериализатор, реализовав IDeserializer , как указано в другом ответе

Комментарии:

1. Я попробовал то же самое, но теперь получаю эту ошибку: Ожидаю значения сообщения с обрамлением реестра схемы слияния. Магический байт был равен 10, ожидая 0

2. Если производитель не использовал реестр схем, то вы не можете использовать этот способ. Вам придется найти этот код/процесс, прежде чем вы сможете когда-либо написать функциональный десериализатор потребителя

3. Звонок AsSyncOverAsync() был для меня решением.

Ответ №2:

Вам нужен класс, реализующий IDeserializer<T> интерфейс, как определено в документации API Кафки. Тогда ваша отмеченная линия будет выглядеть примерно так:

.SetValueDeserializer(new MyCustomDeserializer())

Комментарии:

1. что должно быть в реализации этого пользовательского десериализатора?

2. Предполагая, что у вас установлен Protobuf-net пакет Nuget, ваша реализация десериализации будет выглядеть примерно так: public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context) { using (var stream = new MemoryStream(data)) { return Serializer.Deserialize<T>(stream); } }

3. Когда я попробовал это, я получаю следующую ошибку: Тип не ожидается, и никакой контракт не может быть выведен: GrpcGreeter. Конечное значение в этой строке: возвращаемый сериализатор. Десериализация<T>(поток);

4. @Ali Я бы рекомендовал вам написать модульные тесты для вашего десериализатора вне контекста данных Кафки, поскольку кажется, что данные, которые вы получаете, не соответствуют определенной вами схеме