#c# #parallel-processing #task-parallel-library #dataflow #tpl-dataflow
#c# #параллельная обработка #задача-параллельная-библиотека #поток данных #tpl-поток данных
Вопрос:
Я одновременно обрабатываю очередь, используя ActionBlock
.
Единственная загвоздка здесь в том, что при обработке элемента в очереди я могу захотеть подождать, пока зависимость не будет удовлетворена обработкой другого элемента в очереди.
Я думаю, что я смогу сделать это с помощью библиотеки потоков данных TPL со связыванием, отсрочкой и освобождением отсрочки, но я не уверен, какие конструкции использовать.
В псевдокоде:
public class Item
{
public string Name { get; set; }
public List<string> DependsOn = new List<string>();
}
ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o => {
if (!HasActionBlockProcessedAllDependencies(o.DependsOn))
{
// enqueue a callback when ALL dependencies have been completed
}
else
{
DoWork(o);
}
},
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
});
var items = new[]
{
new Item { Name = "Apple", DependsOn = { "Pear" } },
new Item { Name = "Pear" }
}
Комментарии:
1. Итак, вы не хотите обрабатывать элемент, если не были обработаны другие элементы, от которых он зависит?
2. Да, и я хочу повторно активировать элемент, как только все его зависимости будут удовлетворены наиболее эффективным из возможных способов.
3. Это в более крупном конвейере? Причина, по которой я спрашиваю, заключается в том, что здесь мало что решается с потоком данных. Хотя это и не невозможно, кажется, что вы просто хотите сохранить словарь зависимостей с зависимыми, и когда вы обрабатываете зависимость, публикуете их зависимые
4. да, это большой конвейер, и я хочу обрабатывать 10 элементов одновременно
5. Связывание здесь вам не поможет. Существует несколько подходов, более сложный подход заключается в создании пользовательского блока на основе словаря очередей. однако, возможно, было бы проще не отправлять в первый блок, если он не был обработан, с аналогичной структурой относительно того, что было объяснено
Ответ №1:
Я не уверен, будет ли это вам полезно, но вот пользовательский DependencyTransformBlock
класс, который знает о зависимостях между получаемыми элементами и обрабатывает каждый из них только после успешной обработки его зависимостей. Этот пользовательский блок поддерживает все встроенные функции обычного TransformBlock
, за исключением EnsureOrdered
опции.
Конструкторы этого класса принимают Func<TInput, TKey>
лямбда-выражение для извлечения ключа каждого элемента и Func<TInput, IReadOnlyCollection<TKey>>
лямбда-выражение для извлечения его зависимостей. Ожидается, что ключи будут уникальными. В случае обнаружения дубликата ключа блок завершится сбоем.
В случае циклических зависимостей между элементами затронутые элементы останутся необработанными. Свойство TInput[] Unprocessed
позволяет извлекать необработанные элементы после завершения блока. Элемент также может оставаться необработанным, если какая-либо из его зависимостей не указана.
public class DependencyTransformBlock<TInput, TKey, TOutput> :
ITargetBlock<TInput>, ISourceBlock<TOutput>
{
private readonly ITargetBlock<TInput> _inputBlock;
private readonly IPropagatorBlock<Item, TOutput> _transformBlock;
private readonly object _locker = new object();
private readonly Dictionary<TKey, Item> _items;
private int _pendingCount = 1;
// The initial 1 represents the completion of the _inputBlock
private class Item
{
public TKey Key;
public TInput Input;
public bool HasInput;
public bool IsCompleted;
public HashSet<Item> Dependencies;
public HashSet<Item> Dependents;
public Item(TKey key) => Key = key;
}
public DependencyTransformBlock(
Func<TInput, Task<TOutput>> transform,
Func<TInput, TKey> keySelector,
Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IEqualityComparer<TKey> keyComparer = null)
{
if (transform == null)
throw new ArgumentNullException(nameof(transform));
if (keySelector == null)
throw new ArgumentNullException(nameof(keySelector));
if (dependenciesSelector == null)
throw new ArgumentNullException(nameof(dependenciesSelector));
dataflowBlockOptions =
dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
keyComparer = keyComparer ?? EqualityComparer<TKey>.Defau<
_items = new Dictionary<TKey, Item>(keyComparer);
_inputBlock = new ActionBlock<TInput>(async input =>
{
var key = keySelector(input);
var dependencyKeys = dependenciesSelector(input);
bool isReadyForProcessing = true;
Item item;
lock (_locker)
{
if (!_items.TryGetValue(key, out item))
{
item = new Item(key);
_items.Add(key, item);
}
if (item.HasInput)
throw new InvalidOperationException($"Duplicate key ({key}).");
item.Input = input;
item.HasInput = true;
if (dependencyKeys != null amp;amp; dependencyKeys.Count > 0)
{
item.Dependencies = new HashSet<Item>();
foreach (var dependencyKey in dependencyKeys)
{
if (!_items.TryGetValue(dependencyKey, out var dependency))
{
dependency = new Item(dependencyKey);
_items.Add(dependencyKey, dependency);
}
if (!dependency.IsCompleted)
{
item.Dependencies.Add(dependency);
if (dependency.Dependents == null)
dependency.Dependents = new HashSet<Item>();
dependency.Dependents.Add(item);
}
}
isReadyForProcessing = item.Dependencies.Count == 0;
}
if (isReadyForProcessing) _pendingCount ;
}
if (isReadyForProcessing)
{
await _transformBlock.SendAsync(item);
}
}, new ExecutionDataflowBlockOptions()
{
CancellationToken = dataflowBlockOptions.CancellationToken,
BoundedCapacity = 1
});
var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
{
CancellationToken = dataflowBlockOptions.CancellationToken,
BoundedCapacity = DataflowBlockOptions.Unbounded
});
_transformBlock = new TransformBlock<Item, TOutput>(async item =>
{
try
{
TInput input;
lock (_locker)
{
Debug.Assert(item.HasInput amp;amp; !item.IsCompleted);
input = item.Input;
}
var result = await transform(input).ConfigureAwait(false);
lock (_locker)
{
item.IsCompleted = true;
if (item.Dependents != null)
{
foreach (var dependent in item.Dependents)
{
Debug.Assert(dependent.Dependencies != null);
var removed = dependent.Dependencies.Remove(item);
Debug.Assert(removed);
if (dependent.HasInput
amp;amp; dependent.Dependencies.Count == 0)
{
middleBuffer.Post(dependent);
_pendingCount ;
}
}
}
item.Input = default; // Cleanup
item.Dependencies = null;
item.Dependents = null;
}
return resu<
}
finally
{
lock (_locker)
{
_pendingCount--;
if (_pendingCount == 0) middleBuffer.Complete();
}
}
}, dataflowBlockOptions);
middleBuffer.LinkTo(_transformBlock);
PropagateCompletion(_inputBlock, middleBuffer,
condition: () => { lock (_locker) return --_pendingCount == 0; });
PropagateCompletion(middleBuffer, _transformBlock);
PropagateFailure(_transformBlock, middleBuffer);
PropagateFailure(_transformBlock, _inputBlock);
}
// Constructor with synchronous lambda
public DependencyTransformBlock(
Func<TInput, TOutput> transform,
Func<TInput, TKey> keySelector,
Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IEqualityComparer<TKey> keyComparer = null) : this(
input => Task.FromResult(transform(input)),
keySelector, dependenciesSelector, dataflowBlockOptions, keyComparer)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
}
public TInput[] Unprocessed
{
get
{
lock (_locker) return _items.Values
.Where(item => item.HasInput amp;amp; !item.IsCompleted)
.Select(item => item.Input)
.ToArray();
}
}
public Task Completion => _transformBlock.Completion;
public void Complete() => _inputBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
DataflowMessageHeader header, TInput value, ISourceBlock<TInput> source,
bool consumeToAccept)
{
return _inputBlock.OfferMessage(header, value, source, consumeToAccept);
}
TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<TOutput> target, out bool messageConsumed)
{
return _transformBlock.ConsumeMessage(header, target, out messageConsumed);
}
bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<TOutput> target)
{
return _transformBlock.ReserveMessage(header, target);
}
void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<TOutput> target)
{
_transformBlock.ReleaseReservation(header, target);
}
public IDisposable LinkTo(ITargetBlock<TOutput> target,
DataflowLinkOptions linkOptions)
{
return _transformBlock.LinkTo(target, linkOptions);
}
private async void PropagateCompletion(IDataflowBlock source,
IDataflowBlock target, Func<bool> condition = null)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
if (source.Completion.IsFaulted)
target.Fault(source.Completion.Exception.InnerException);
else
if (condition == null || condition()) target.Complete();
}
private async void PropagateFailure(IDataflowBlock source,
IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
if (source.Completion.IsFaulted)
target.Fault(source.Completion.Exception.InnerException);
}
}
Пример использования:
var block = new DependencyTransformBlock<Item, string, Item>(item =>
{
DoWork(item);
return item;
},
keySelector: item => item.Name,
dependenciesSelector: item => item.DependsOn,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
},
keyComparer: StringComparer.OrdinalIgnoreCase);
//...
block.LinkTo(DataflowBlock.NullTarget<Item>());
В этом примере блок связан с a NullTarget
, чтобы отменить его вывод, так что он становится по существу ActionBlock
эквивалентом.