#c# #multithreading #task #dataflow
#c# #многопоточность #задача #поток данных
Вопрос:
У меня есть набор работников, с которыми я хотел бы обработать некоторые данные, чтобы данные были добавлены в очередь (буферный блок). После того, как рабочий обработает данные, я бы хотел, чтобы он вернул рабочий элемент в очередь. Я написал программу ниже, но похоже, что рабочие не получают данные равномерно, есть предложения о том, как это исправить? Правильно ли я использую этот шаблон потоков?
например, в выходных данных я вижу
рабочий 1, работа 66, количество заданий 32651, сумма 2154966
рабочий 0, работа 16, количество заданий 32637, сумма 522192
рабочий 1, работа 61, количество заданий 32675, сумма 1993175
рабочий 0, работа 72, количество заданий 32649, сумма 2350728
рабочий 1, работа 95, количество заданий 32688, сумма 3105360
рабочий 0, работа 86, количество заданий 32663, сумма 2809018
рабочий 1, работа 0, количество заданий 32649, сумма 0
< — этого не ожидалось
рабочий 0, работа 98, количество заданий 32673, сумма 3201954
рабочий 1, работа 74, количество заданий 32649, сумма 2416026
рабочий 0, работа 93, количество заданий 32675, сумма 3038775
рабочий 1, работа 7, количество заданий 32702, сумма 228914
рабочий 0, работа 42, количество заданий 32642, сумма 1370964
рабочий 1, работа 32, количество заданий 32708, сумма 1046656
рабочий 0, работа 99, количество заданий 32693, сумма 3236607
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
using System.Collections.Generic;
namespace testworkers
{
class WorkerArguments
{
public int i;
public int sum;
public int numjobs;
public WorkerArguments()
{
}
}
class Program
{
static void Main(string[] args)
{
var cancelSignal = new CancellationTokenSource();
var buffer = new BufferBlock<WorkerArguments>(new DataflowBlockOptions() { BoundedCapacity = 5000, CancellationToken = cancelSignal.Token });
List<string> splunkProdUsers = new List<string>(new string[] { "worker1", "worker2", "worker3", "worker4" });
List<Task<int>> consumers = new List<Task<int>>();
for (int i = 0; i < splunkProdUsers.Count; i )
{
var consumer = Worker(i, buffer, buffer, cancelSignal.Token);
consumers.Add(consumer);
}
for (int j = 0; j < 100; j )
{
buffer.Post(new WorkerArguments { i = j });
}
for (int i = 0; i < consumers.Count; i )
{
consumers[i].Wait(cancelSignal.Token);
}
}
static async Task<int> Worker(int workerId, ISourceBlock<WorkerArguments> source, ITargetBlock<WorkerArguments> target, CancellationToken cancelToken)
{
int workProcessed = 0;
while (await source.OutputAvailableAsync())
{
WorkerArguments workArguments = source.Receive(cancelToken);
Console.WriteLine("worker {0}, work {1}, numjobs {2}, sum {3}", workerId, workArguments.i, workArguments.numjobs, workArguments.sum);
WorkerArguments nextWorkArguments = new WorkerArguments();
nextWorkArguments.i = workArguments.i;
nextWorkArguments.sum = workArguments.sum workArguments.i;
nextWorkArguments.numjobs = workArguments.numjobs 1;
target.Post(nextWorkArguments);
}
return workProcessed;
}
}
}
Комментарии:
1. Если все ваши работники одинаковы, я бы рекомендовал использовать
ActionBlock<T>
, сMaxDegreeOfParallelism
набором.2. в чем преимущество использования ActionBlock по сравнению с тем, что у меня есть в настоящее время?
3. Он имеет встроенную поддержку разделения. Нет разделения с
Receive
; какой бы работник ни вызывал его первым, он получает элемент.