Создайте конвейер, который не перегружает память

#c# #tpl-dataflow

#c# #tpl-поток данных

Вопрос:

Я создаю конвейер, где вам нужно:

  • чтение из файлов и выполнение преобразований [TRANSFORM_1];
  • с преобразованными данными я должен выполнить 2 процедуры:
    • выполните другое преобразование (очень дорогое для процессора) [TRANSFORM_2]
    • выполните действие, которое мне понадобится для выполнения других действий, не важных для целей вопроса [ACTION_1] .

Ниже приведена карта с тем, как я думал сделать:

ReadingFiles() -> TRANSFORM1_ -> BROADCASTBLOCK -> TRANSFORM2_ -> ... -> _______________________________________________ -> ACTION_1

Требования

должны быть выполнены два пункта:

  • Не перегружайте доступную память;
  • Все отправленные сообщения должны поступать в последний блок.

Половинное решение

Чтобы выполнить первое требование, я просто установил ExecutionDataflowBlockOptions с BoundedCapacity = n в различных блоках. Однако при использовании обратного давления с BroadcastBlock нет гарантии, что все сообщения будут отправлены.

Вопрос

если других решений нет, как можно реализовать класс, который создает пользовательский широковещательный блок? Я бы создал класс, который работает как GuaranteedBroadCastBlock, но реализует интерфейс IPropagatorBlock

на данный момент я прочитал только несколько примеров, где они создают ActionBlock, который действует как трансляция через метод, но я думаю, что лучше всего создать пользовательский класс, а не метод

Примечание: О первой части конвейера (при чтении из файла и отправке в TRANSFORM_1) Я уже знаю, что вам нужно использовать await TRANSFORM_1.SendAsync () , чтобы гарантировать, что все сообщения отправляются в первый блок. Проблема заключается в блоке широковещательной рассылки, который отправляет самый последний.

Комментарии:

1. Правильно ли вы ожидаете выполнения SendAsync задач? Предполагается SendAsync , что одновременно в полете выполняется не более нескольких операций.

2. Когда все блоки имеют ограниченную емкость, это создает обратное давление и, вероятно, для первого блока, который должен использовать SendAsync, в этот момент код блокируется до тех пор, пока что-то не может быть отправлено

3. В любом случае, трудно понять, что происходит, без минимального примера

4. но если внутри конвейера у нас есть BroadcastBlock, как мы можем гарантировать отправку всех сообщений?

5. Трансляция будет отбрасывать сообщения, если она столкнется с обратным давлением. Однако сначала мы должны выяснить, зачем вам нужен широковещательный блок, и если он вам действительно нужен (к чему я отношусь скептически), тогда есть другие варианты. Однако это будет зависеть именно от вашего дизайна. Чего мы не знаем

Ответ №1:

Я думал о написании этого класса, взяв пример из других примеров классов и кое-что изменив.

  • Мнения?
  • Есть ли что-то, что не подходит?
  • Помимо наследования интерфейса ITargetBlock, могу ли я наследовать интерфейс IPropagatorBlock <T, U> и улучшить связь с другими целевыми блоками?
  • является ли этот класс потокобезопасным? что может произойти, если я создам несколько экземпляров этого класса?

публичный класс GuaranteedBroadcastBlock : ITargetBlock

 {
    private readonly List<ITargetBlock<T>> _targets;
    private readonly ITargetBlock<T> block;

    public GuaranteedBroadcastBlock(DataflowBlockOptions options)
    {
        var task = new List<Task>();
        block = new ActionBlock<T>(
            async item =>
            {
                foreach (var target in _targets)
                {
                    task.Add(target.SendAsync(item));
                }
                await Task.WhenAll(task);
            }, new ExecutionDataflowBlockOptions
                   {
                       BoundedCapacity = options.BoundedCapacity,
                       CancellationToken = options.CancellationToken
                   });
        
    }

    public Task Completion => block.Completion.ContinueWith(task =>
    {
        foreach (var target in _targets)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    public void AddTarget(ITargetBlock<T> target) => _targets.Add(target);

    public void Complete() => block.Complete();

    public void Fault(Exception exception)
    {
        Console.WriteLine("Error in GuaranteedBroadcastBlock");
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return block.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

}
  

Комментарии:

1. ТАК же как и сайт вопросов и ответов по конкретным вопросам программирования. Если вы хотите запросить проверку кода, отправьте сообщение по адресу codereview.stackexchange.com . Поток данных не переполняет память. Он уже предлагает ограниченный DOP, ограниченную пропускную способность и переработку задач для получения запросов на обратное дросселирование, создания противодавления и предотвращения монополизации ядер задачами. Все, что вам нужно сделать, это указать BoundedCapacity и передавать сообщения в начало конвейера с помощью await headBlock.SendAsync

2. Итак, то, что вы опубликовали, не нужно. Похоже, единственное , что вам нужно, это обработка сообщений об ошибках. Самый простой способ сделать это — не отключать обработку ошибок и не позволять катастрофическим ошибкам оставаться незамеченными. Вместо этого обработайте ошибки и распространите ваши блоки Result<T> , которые могут быть либо объектом успеха, либо объектом ошибки. Это называется железнодорожно-ориентированным программированием . В потоке данных TPL вы можете использовать LinkTo предикат для перенаправления сообщений об ошибках в блоки регистратора вместо их распространения

3. PS: Вам также не нужен блок «GuaranteedBroadcast». В связанном ответе ответ svick чрезвычайно прост — просто цикл, который отправляет сообщения в целевые блоки. Вам больше ничего не нужно. Блоки потока данных просты при правильном использовании. Модель, которую вы должны иметь в виду, — это конвейер командной оболочки, а не полная программа в коробке

4. программа, которую я использую, выполнена в winform, потому что мне нужна графическая обратная связь и потому, что есть другие вставленные вещи, которые не связаны с вопросом, который я задал. Я хотел просто соединять блоки, когда захочу, а не все сразу. И тогда @Theodor ответил отлично

5. За исключением того, что этот код вообще не нужен. Код Svick работает просто отлично. И Winforms не меняет принцип работы потока данных. Вы можете запустить любой блок, который хотите, в потоке пользовательского интерфейса, указав параметр TaskScheduler . То, что вы получили, определенно не «просто»