#json #.net #.net-core #apache-kafka #confluent-platform
#json #.net #.net-core #apache-kafka #confluent-платформа
Вопрос:
Глядя на https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/JsonSerialization/Program.cs для этого требуется URL-адрес реестра схемы. Есть ли простой способ сериализации / десериализации JSON без этой дополнительной сложности?
Ответ №1:
Попробуйте это:
Модель:
public class KafkaMessage : ISerializer<KafkaMessage>, IDeserializer<KafkaMessage>
{
public MessageType MessageType { get; set; }
public string MessageData { get; set; }
public byte[] Serialize(KafkaMessage data, SerializationContext context)
{
using (var ms = new MemoryStream())
{
string jsonString = JsonSerializer.Serialize(data);
var writer = new StreamWriter(ms);
writer.Write(jsonString);
writer.Flush();
ms.Position = 0;
return ms.ToArray();
}
}
KafkaMessage IDeserializer<KafkaMessage>.Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
return JsonSerializer.Deserialize<KafkaMessage>(data.ToArray());
}
}
Производитель:
using (var producer = new ProducerBuilder<Null, KafkaMessage>(_config)
.SetValueSerializer(new KafkaMessage())
.Build())
{
KafkaMessage km = new KafkaMessage { MessageType = messageType, MessageData = messageData };
var deliveryResult = await producer
.ProduceAsync(_topic, new Message<Null, KafkaMessage> { Value = km })
.ContinueWith(task => !task.IsFaulted);
result = deliveryResu<
}
Потребитель:
using (var consumer = new ConsumerBuilder<Ignore, KafkaMessage>(_config)
.SetValueDeserializer(new KafkaMessage())
.SetLogHandler((_, logHandler) => { })
.SetErrorHandler((_, errorHandler) => { })
.Build())
{
consumer.Subscribe(_topic);
try
{
while (!token.IsCancellationRequested)
{
var result = consumer.Consume(token);
var messageData = result.Message.Value;
Console.log($"Output: { messageData.MessageData }")
}
}
catch (OperationCanceledException)
{
/* Close connection */
consumer.Close();
}
}
Комментарии:
1. Вы не должны называть сериализатор так, как
KafkaMessage
будто это тип, используемый в другом месте. Другими словами, разделите классы сериализатора / десериализации и модели2. Не могли бы вы, пожалуйста, привести пример разделения классов сериализатора / десериализации / модели? Чтобы сериализовать / десериализовать модель, модель должна РЕАЛИЗОВАТЬ iSerializer / iDeserializer.
3. «Должен» в соответствии с чем? Модель на самом деле этого не делает. Это все равно, что сказать, что все типы строк или Int32 должны реализовывать эти интерфейсы. Посмотрите на существующие сериализаторы в качестве примеров — github.com/confluentinc/confluent-kafka-dotnet/blob/master/src /…
4. Другими словами, вы можете использовать обобщения, такие как
ConsumerBuilder<Ignore, KafkaMessage>
, и тогда у вас есть.SetVauleDeserializer(new MyCustomDeserializer())
, ноMyCustomDeserializer
просто определяетDeserialize
метод, фактически не содержит никаких полей с отслеживанием состояния5. Да, извините, я согласен, лучшим подходом было бы разделить классы сериализатора / десериализации и модели, используя дженерики. Спасибо за помощь в улучшении моего кода.
Ответ №2:
Предоставленная вами ссылка предназначена для схемы JSON, а не для простого JSON.
JSON — это формат открытого текста. Используйте Utf8Serializer
и отправляйте строки после преобразования любого класса модели или словаря в строку JSON.
Однако это не гарантирует (на стороне сервера), что ваши сообщения соответствуют любому согласованному формату (т. Е. Определен Требуемый набор полей), поэтому именно здесь вы хотели бы ввести схему и использовать реестр