Очередь потока данных TPL с отсрочкой

#c# #parallel-processing #task-parallel-library #dataflow #tpl-dataflow

#c# #параллельная обработка #задача-параллельная-библиотека #поток данных #tpl-поток данных

Вопрос:

Я одновременно обрабатываю очередь, используя ActionBlock .

Единственная загвоздка здесь в том, что при обработке элемента в очереди я могу захотеть подождать, пока зависимость не будет удовлетворена обработкой другого элемента в очереди.

Я думаю, что я смогу сделать это с помощью библиотеки потоков данных TPL со связыванием, отсрочкой и освобождением отсрочки, но я не уверен, какие конструкции использовать.

В псевдокоде:

 public class Item 
{
    public string Name { get; set; }
    public List<string> DependsOn = new List<string>();
}

ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o => {
    if (!HasActionBlockProcessedAllDependencies(o.DependsOn)) 
    {
       // enqueue a callback when ALL dependencies have been completed
    } 
    else 
    {
        DoWork(o);
    }
},
new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
});

var items = new[] 
{
    new Item { Name = "Apple", DependsOn = { "Pear" } },
    new Item { Name = "Pear" }
}
  

Комментарии:

1. Итак, вы не хотите обрабатывать элемент, если не были обработаны другие элементы, от которых он зависит?

2. Да, и я хочу повторно активировать элемент, как только все его зависимости будут удовлетворены наиболее эффективным из возможных способов.

3. Это в более крупном конвейере? Причина, по которой я спрашиваю, заключается в том, что здесь мало что решается с потоком данных. Хотя это и не невозможно, кажется, что вы просто хотите сохранить словарь зависимостей с зависимыми, и когда вы обрабатываете зависимость, публикуете их зависимые

4. да, это большой конвейер, и я хочу обрабатывать 10 элементов одновременно

5. Связывание здесь вам не поможет. Существует несколько подходов, более сложный подход заключается в создании пользовательского блока на основе словаря очередей. однако, возможно, было бы проще не отправлять в первый блок, если он не был обработан, с аналогичной структурой относительно того, что было объяснено

Ответ №1:

Я не уверен, будет ли это вам полезно, но вот пользовательский DependencyTransformBlock класс, который знает о зависимостях между получаемыми элементами и обрабатывает каждый из них только после успешной обработки его зависимостей. Этот пользовательский блок поддерживает все встроенные функции обычного TransformBlock , за исключением EnsureOrdered опции.

Конструкторы этого класса принимают Func<TInput, TKey> лямбда-выражение для извлечения ключа каждого элемента и Func<TInput, IReadOnlyCollection<TKey>> лямбда-выражение для извлечения его зависимостей. Ожидается, что ключи будут уникальными. В случае обнаружения дубликата ключа блок завершится сбоем.

В случае циклических зависимостей между элементами затронутые элементы останутся необработанными. Свойство TInput[] Unprocessed позволяет извлекать необработанные элементы после завершения блока. Элемент также может оставаться необработанным, если какая-либо из его зависимостей не указана.

 public class DependencyTransformBlock<TInput, TKey, TOutput> :
    ITargetBlock<TInput>, ISourceBlock<TOutput>
{
    private readonly ITargetBlock<TInput> _inputBlock;
    private readonly IPropagatorBlock<Item, TOutput> _transformBlock;

    private readonly object _locker = new object();
    private readonly Dictionary<TKey, Item> _items;

    private int _pendingCount = 1;
    // The initial 1 represents the completion of the _inputBlock

    private class Item
    {
        public TKey Key;
        public TInput Input;
        public bool HasInput;
        public bool IsCompleted;
        public HashSet<Item> Dependencies;
        public HashSet<Item> Dependents;

        public Item(TKey key) => Key = key;
    }

    public DependencyTransformBlock(
        Func<TInput, Task<TOutput>> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null)
    {
        if (transform == null)
            throw new ArgumentNullException(nameof(transform));
        if (keySelector == null)
            throw new ArgumentNullException(nameof(keySelector));
        if (dependenciesSelector == null)
            throw new ArgumentNullException(nameof(dependenciesSelector));

        dataflowBlockOptions =
            dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
        keyComparer = keyComparer ?? EqualityComparer<TKey>.Defau<

        _items = new Dictionary<TKey, Item>(keyComparer);

        _inputBlock = new ActionBlock<TInput>(async input =>
        {
            var key = keySelector(input);
            var dependencyKeys = dependenciesSelector(input);
            bool isReadyForProcessing = true;
            Item item;
            lock (_locker)
            {
                if (!_items.TryGetValue(key, out item))
                {
                    item = new Item(key);
                    _items.Add(key, item);
                }
                if (item.HasInput)
                    throw new InvalidOperationException($"Duplicate key ({key}).");
                item.Input = input;
                item.HasInput = true;

                if (dependencyKeys != null amp;amp; dependencyKeys.Count > 0)
                {
                    item.Dependencies = new HashSet<Item>();
                    foreach (var dependencyKey in dependencyKeys)
                    {
                        if (!_items.TryGetValue(dependencyKey, out var dependency))
                        {
                            dependency = new Item(dependencyKey);
                            _items.Add(dependencyKey, dependency);
                        }
                        if (!dependency.IsCompleted)
                        {
                            item.Dependencies.Add(dependency);
                            if (dependency.Dependents == null)
                                dependency.Dependents = new HashSet<Item>();
                            dependency.Dependents.Add(item);
                        }
                    }
                    isReadyForProcessing = item.Dependencies.Count == 0;
                }
                if (isReadyForProcessing) _pendingCount  ;
            }
            if (isReadyForProcessing)
            {
                await _transformBlock.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = 1
        });

        var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = DataflowBlockOptions.Unbounded
        });

        _transformBlock = new TransformBlock<Item, TOutput>(async item =>
        {
            try
            {
                TInput input;
                lock (_locker)
                {
                    Debug.Assert(item.HasInput amp;amp; !item.IsCompleted);
                    input = item.Input;
                }
                var result = await transform(input).ConfigureAwait(false);
                lock (_locker)
                {
                    item.IsCompleted = true;
                    if (item.Dependents != null)
                    {
                        foreach (var dependent in item.Dependents)
                        {
                            Debug.Assert(dependent.Dependencies != null);
                            var removed = dependent.Dependencies.Remove(item);
                            Debug.Assert(removed);
                            if (dependent.HasInput
                                amp;amp; dependent.Dependencies.Count == 0)
                            {
                                middleBuffer.Post(dependent);
                                _pendingCount  ;
                            }
                        }
                    }
                    item.Input = default; // Cleanup
                    item.Dependencies = null;
                    item.Dependents = null;
                }
                return resu<
            }
            finally
            {
                lock (_locker)
                {
                    _pendingCount--;
                    if (_pendingCount == 0) middleBuffer.Complete();
                }
            }
        }, dataflowBlockOptions);

        middleBuffer.LinkTo(_transformBlock);

        PropagateCompletion(_inputBlock, middleBuffer,
            condition: () => { lock (_locker) return --_pendingCount == 0; });
        PropagateCompletion(middleBuffer, _transformBlock);
        PropagateFailure(_transformBlock, middleBuffer);
        PropagateFailure(_transformBlock, _inputBlock);
    }

    // Constructor with synchronous lambda
    public DependencyTransformBlock(
        Func<TInput, TOutput> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null) : this(
            input => Task.FromResult(transform(input)),
            keySelector, dependenciesSelector, dataflowBlockOptions, keyComparer)
    {
        if (transform == null) throw new ArgumentNullException(nameof(transform));
    }

    public TInput[] Unprocessed
    {
        get
        {
            lock (_locker) return _items.Values
                .Where(item => item.HasInput amp;amp; !item.IsCompleted)
                .Select(item => item.Input)
                .ToArray();
        }
    }

    public Task Completion => _transformBlock.Completion;
    public void Complete() => _inputBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);

    DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
        DataflowMessageHeader header, TInput value, ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return _inputBlock.OfferMessage(header, value, source, consumeToAccept);
    }

    TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target, out bool messageConsumed)
    {
        return _transformBlock.ConsumeMessage(header, target, out messageConsumed);
    }

    bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        return _transformBlock.ReserveMessage(header, target);
    }

    void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        _transformBlock.ReleaseReservation(header, target);
    }

    public IDisposable LinkTo(ITargetBlock<TOutput> target,
        DataflowLinkOptions linkOptions)
    {
        return _transformBlock.LinkTo(target, linkOptions);
    }

    private async void PropagateCompletion(IDataflowBlock source,
        IDataflowBlock target, Func<bool> condition = null)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
        else
            if (condition == null || condition()) target.Complete();
    }

    private async void PropagateFailure(IDataflowBlock source,
        IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
    }
}
  

Пример использования:

 var block = new DependencyTransformBlock<Item, string, Item>(item =>
{
    DoWork(item);
    return item;
},
keySelector: item => item.Name,
dependenciesSelector: item => item.DependsOn,
new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
},
keyComparer: StringComparer.OrdinalIgnoreCase);

//...

block.LinkTo(DataflowBlock.NullTarget<Item>());
  

В этом примере блок связан с a NullTarget , чтобы отменить его вывод, так что он становится по существу ActionBlock эквивалентом.