#c# #apache-kafka
#c# #apache-kafka
Вопрос:
Как вы это делаете в C #? Библиотека, похоже, не соответствует некоторым другим реализациям.
Я подумал что-то вроде
using (var c = new Consumer<Ignore, string>(conf)) {
c.Subscribe(topic);
c.Assignment
.ForEach(partition => c.Seek(new TopicPartitionOffset(partition, Offset.Beginning)));
}
но изначально нет назначений. Если я подключу событие к OnPartitionsAssigned
, у потребителя все еще не будет назначенного раздела, и хотя у события есть раздел, потребитель не сможет его найти.
Я полагаю, что есть стандартный способ сделать это, но я не уверен, что это такое.
Ответ №1:
Это было бы что-то вроде этого. вы должны просто знать о номере раздела, который здесь 0
:
using (var c = new Consumer<Ignore, string>(conf))
{
TopicPartitionOffset tps = new TopicPartitionOffset(new TopicPartition("TOPIC", 0),
Offset.Beginning);
c.Assign(tps);
}
Комментарии:
1. Что ж, это работает, но кажется немного болезненным, если у вас несколько разделов
Ответ №2:
Этот пример оказался абсолютно рабочим для меня в той же ситуации:
public IConsumer<string, string> Consumer { get; }
public KCKeysKafkaConsumer(ConsumerConfig conf, string topic)
{
Consumer = new ConsumerBuilder<string, string>(conf)
.SetPartitionsAssignedHandler((c, partitions) =>
{
return partitions.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
}).Build();
Consumer.Subscribe(topic);
}