#c# #multithreading #thread-safety #task-parallel-library #task
#c# #многопоточность #безопасность потоков #задача-параллельная-библиотека #задача
Вопрос:
Я пытаюсь реализовать довольно простое приложение в стиле производителя / потребителя с несколькими производителями и одним потребителем.
Исследование привело меня к BlockingCollection<T>
тому, что является полезным и позволило мне реализовать долгосрочную задачу потребителя, как показано ниже:
var c1 = Task.Factory.StartNew(() =>
{
var buffer = new List<int>(BATCH_BUFFER_SIZE);
foreach (var value in blockingCollection.GetConsumingEnumerable())
{
buffer.Add(value);
if (buffer.Count == BATCH_BUFFER_SIZE)
{
ProcessItems(buffer);
buffer.Clear();
}
}
});
ProcessItems
Функция отправляет буфер в базу данных и работает пакетно. Однако это решение не является оптимальным. В периоды производства baron может пройти некоторое время, пока буфер не будет заполнен, что означает, что база данных устарела.
Более идеальным решением было бы либо запустить задачу по 30-секундному таймеру, либо замкнуть foreach
ее с таймаутом.
Я работал с идеей таймера и придумал это:
syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);
private static void TimerElapsed(object state)
{
var buffer = new List<int>();
var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();
foreach (var value in collection)
{
buffer.Add(value);
}
ProcessItems(buffer);
buffer.Clear();
}
У этого есть явная проблема, заключающаяся в том, что foreach
он будет заблокирован до конца, что противоречит цели таймера.
Кто-нибудь может предложить направление? По сути, мне нужно BlockingCollection
периодически делать снимок и обрабатывать содержимое, чтобы очистить его. Возможно, a BlockingCollection
— неправильный тип?
Ответ №1:
Вместо использования GetConsumingEnumerable
в обратном вызове таймера используйте один из этих методов, добавляя результаты в список, пока он не вернется false
, или вы не достигли удовлетворительного размера пакета.
Блокирующая коллекция.Метод TryTake (T) — вероятно, то, что вам нужно, вы вообще не хотите выполнять дальнейшее ожидание.
Блокирующая коллекция.Метод TryTake (T, Int32)
Блокирующая коллекция.Метод TryTake (T, TimeSpan)
Вы можете легко извлечь это в расширение (непроверенное):
public static IList<T> Flush<T>
(this BlockingCollection<T> collection, int maxSize = int.MaxValue)
{
// Argument checking.
T next;
var result = new List<T>();
while(result.Count < maxSize amp;amp; collection.TryTake(out next))
{
result.Add(next);
}
return resu<
}
Комментарии:
1. Я пошел с этой реализацией, которая работает отлично. Он запускается каждые x секунд и отправляет только то, что доступно (без ожидания). Спасибо за вашу помощь. gist.github.com/AntSwift/d2faa4f43d1a6d594172
2. Полная рабочая версия теперь доступна по адресу github.com/AntSwift/AS.BlockingCollectionDemo
3. Я считаю, что ваши примеры кода будут подвержены условиям гонки из-за механизма таймера. Если ваш потребитель не может выполнить задачу в течение периода таймера, таймер сработает независимо от того, что приведет к появлению другого потребителя.
4. @Andrew: Это зависит от используемой реализации таймера, которая выходит за рамки этого ответа, который касается только самой коллекции блокировок.