#c# #tpl-dataflow
#c# #tpl-поток данных
Вопрос:
Я создаю конвейер, где вам нужно:
- чтение из файлов и выполнение преобразований [TRANSFORM_1];
- с преобразованными данными я должен выполнить 2 процедуры:
- выполните другое преобразование (очень дорогое для процессора) [TRANSFORM_2]
- выполните действие, которое мне понадобится для выполнения других действий, не важных для целей вопроса [ACTION_1] .
Ниже приведена карта с тем, как я думал сделать:
ReadingFiles() -> TRANSFORM1_ -> BROADCASTBLOCK -> TRANSFORM2_ -> ... -> _______________________________________________ -> ACTION_1
Требования
должны быть выполнены два пункта:
- Не перегружайте доступную память;
- Все отправленные сообщения должны поступать в последний блок.
Половинное решение
Чтобы выполнить первое требование, я просто установил ExecutionDataflowBlockOptions
с BoundedCapacity = n
в различных блоках. Однако при использовании обратного давления с BroadcastBlock нет гарантии, что все сообщения будут отправлены.
Вопрос
если других решений нет, как можно реализовать класс, который создает пользовательский широковещательный блок? Я бы создал класс, который работает как GuaranteedBroadCastBlock, но реализует интерфейс IPropagatorBlock
на данный момент я прочитал только несколько примеров, где они создают ActionBlock, который действует как трансляция через метод, но я думаю, что лучше всего создать пользовательский класс, а не метод
Примечание: О первой части конвейера (при чтении из файла и отправке в TRANSFORM_1) Я уже знаю, что вам нужно использовать await TRANSFORM_1.SendAsync ()
, чтобы гарантировать, что все сообщения отправляются в первый блок. Проблема заключается в блоке широковещательной рассылки, который отправляет самый последний.
Комментарии:
1. Правильно ли вы ожидаете выполнения
SendAsync
задач? ПредполагаетсяSendAsync
, что одновременно в полете выполняется не более нескольких операций.2. Когда все блоки имеют ограниченную емкость, это создает обратное давление и, вероятно, для первого блока, который должен использовать SendAsync, в этот момент код блокируется до тех пор, пока что-то не может быть отправлено
3. В любом случае, трудно понять, что происходит, без минимального примера
4. но если внутри конвейера у нас есть BroadcastBlock, как мы можем гарантировать отправку всех сообщений?
5. Трансляция будет отбрасывать сообщения, если она столкнется с обратным давлением. Однако сначала мы должны выяснить, зачем вам нужен широковещательный блок, и если он вам действительно нужен (к чему я отношусь скептически), тогда есть другие варианты. Однако это будет зависеть именно от вашего дизайна. Чего мы не знаем
Ответ №1:
Я думал о написании этого класса, взяв пример из других примеров классов и кое-что изменив.
- Мнения?
- Есть ли что-то, что не подходит?
- Помимо наследования интерфейса ITargetBlock, могу ли я наследовать интерфейс IPropagatorBlock <T, U> и улучшить связь с другими целевыми блоками?
- является ли этот класс потокобезопасным? что может произойти, если я создам несколько экземпляров этого класса?
публичный класс GuaranteedBroadcastBlock : ITargetBlock
{
private readonly List<ITargetBlock<T>> _targets;
private readonly ITargetBlock<T> block;
public GuaranteedBroadcastBlock(DataflowBlockOptions options)
{
var task = new List<Task>();
block = new ActionBlock<T>(
async item =>
{
foreach (var target in _targets)
{
task.Add(target.SendAsync(item));
}
await Task.WhenAll(task);
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = options.BoundedCapacity,
CancellationToken = options.CancellationToken
});
}
public Task Completion => block.Completion.ContinueWith(task =>
{
foreach (var target in _targets)
{
if (task.Exception != null)
target.Fault(task.Exception);
else
target.Complete();
}
});
public void AddTarget(ITargetBlock<T> target) => _targets.Add(target);
public void Complete() => block.Complete();
public void Fault(Exception exception)
{
Console.WriteLine("Error in GuaranteedBroadcastBlock");
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return block.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}
Комментарии:
1. ТАК же как и сайт вопросов и ответов по конкретным вопросам программирования. Если вы хотите запросить проверку кода, отправьте сообщение по адресу codereview.stackexchange.com . Поток данных не переполняет память. Он уже предлагает ограниченный DOP, ограниченную пропускную способность и переработку задач для получения запросов на обратное дросселирование, создания противодавления и предотвращения монополизации ядер задачами. Все, что вам нужно сделать, это указать BoundedCapacity и передавать сообщения в начало конвейера с помощью
await headBlock.SendAsync
2. Итак, то, что вы опубликовали, не нужно. Похоже, единственное , что вам нужно, это обработка сообщений об ошибках. Самый простой способ сделать это — не отключать обработку ошибок и не позволять катастрофическим ошибкам оставаться незамеченными. Вместо этого обработайте ошибки и распространите ваши блоки
Result<T>
, которые могут быть либо объектом успеха, либо объектом ошибки. Это называется железнодорожно-ориентированным программированием . В потоке данных TPL вы можете использоватьLinkTo
предикат для перенаправления сообщений об ошибках в блоки регистратора вместо их распространения3. PS: Вам также не нужен блок «GuaranteedBroadcast». В связанном ответе ответ svick чрезвычайно прост — просто цикл, который отправляет сообщения в целевые блоки. Вам больше ничего не нужно. Блоки потока данных просты при правильном использовании. Модель, которую вы должны иметь в виду, — это конвейер командной оболочки, а не полная программа в коробке
4. программа, которую я использую, выполнена в winform, потому что мне нужна графическая обратная связь и потому, что есть другие вставленные вещи, которые не связаны с вопросом, который я задал. Я хотел просто соединять блоки, когда захочу, а не все сразу. И тогда @Theodor ответил отлично
5. За исключением того, что этот код вообще не нужен. Код Svick работает просто отлично. И Winforms не меняет принцип работы потока данных. Вы можете запустить любой блок, который хотите, в потоке пользовательского интерфейса, указав параметр TaskScheduler . То, что вы получили, определенно не «просто»