Есть ли способ использовать confluent Kafka Dotnet JSON сериализатор БЕЗ реестра схемы,

#json #.net #.net-core #apache-kafka #confluent-platform

#json #.net #.net-core #apache-kafka #confluent-платформа

Вопрос:

Глядя на https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/JsonSerialization/Program.cs для этого требуется URL-адрес реестра схемы. Есть ли простой способ сериализации / десериализации JSON без этой дополнительной сложности?

Ответ №1:

Попробуйте это:

Модель:

 public class KafkaMessage : ISerializer<KafkaMessage>, IDeserializer<KafkaMessage>
{
    public MessageType MessageType { get; set; }
    public string MessageData { get; set; }

    public byte[] Serialize(KafkaMessage data, SerializationContext context)
    {
        using (var ms = new MemoryStream())
        {
            string jsonString = JsonSerializer.Serialize(data);
            var writer = new StreamWriter(ms);

            writer.Write(jsonString);
            writer.Flush();
            ms.Position = 0;

            return ms.ToArray();
        }
    }

    KafkaMessage IDeserializer<KafkaMessage>.Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        return JsonSerializer.Deserialize<KafkaMessage>(data.ToArray());
    }
}
 

Производитель:

 using (var producer = new ProducerBuilder<Null, KafkaMessage>(_config)
    .SetValueSerializer(new KafkaMessage())
    .Build())
{
    KafkaMessage km = new KafkaMessage { MessageType = messageType, MessageData = messageData };
    var deliveryResult = await producer
        .ProduceAsync(_topic, new Message<Null, KafkaMessage> { Value = km })
        .ContinueWith(task => !task.IsFaulted);

    result = deliveryResu<
}
 

Потребитель:

 using (var consumer = new ConsumerBuilder<Ignore, KafkaMessage>(_config)
    .SetValueDeserializer(new KafkaMessage())
    .SetLogHandler((_, logHandler) => {  })
    .SetErrorHandler((_, errorHandler) => { })
    .Build())
{ 
    consumer.Subscribe(_topic);
    try
    {
        while (!token.IsCancellationRequested)
        {
            var result = consumer.Consume(token);
            var messageData = result.Message.Value;
            Console.log($"Output: { messageData.MessageData }")
        }
    }
    catch (OperationCanceledException)
    {
        /* Close connection */
        consumer.Close();
    }
}
 

Комментарии:

1. Вы не должны называть сериализатор так, как KafkaMessage будто это тип, используемый в другом месте. Другими словами, разделите классы сериализатора / десериализации и модели

2. Не могли бы вы, пожалуйста, привести пример разделения классов сериализатора / десериализации / модели? Чтобы сериализовать / десериализовать модель, модель должна РЕАЛИЗОВАТЬ iSerializer / iDeserializer.

3. «Должен» в соответствии с чем? Модель на самом деле этого не делает. Это все равно, что сказать, что все типы строк или Int32 должны реализовывать эти интерфейсы. Посмотрите на существующие сериализаторы в качестве примеров — github.com/confluentinc/confluent-kafka-dotnet/blob/master/src /…

4. Другими словами, вы можете использовать обобщения, такие как ConsumerBuilder<Ignore, KafkaMessage> , и тогда у вас есть .SetVauleDeserializer(new MyCustomDeserializer()) , но MyCustomDeserializer просто определяет Deserialize метод, фактически не содержит никаких полей с отслеживанием состояния

5. Да, извините, я согласен, лучшим подходом было бы разделить классы сериализатора / десериализации и модели, используя дженерики. Спасибо за помощь в улучшении моего кода.

Ответ №2:

Предоставленная вами ссылка предназначена для схемы JSON, а не для простого JSON.

JSON — это формат открытого текста. Используйте Utf8Serializer и отправляйте строки после преобразования любого класса модели или словаря в строку JSON.

Однако это не гарантирует (на стороне сервера), что ваши сообщения соответствуют любому согласованному формату (т. Е. Определен Требуемый набор полей), поэтому именно здесь вы хотели бы ввести схему и использовать реестр