Шаблон производителя / потребителя с пакетным производителем

#c# #multithreading #thread-safety #task-parallel-library #task

#c# #многопоточность #безопасность потоков #задача-параллельная-библиотека #задача

Вопрос:

Я пытаюсь реализовать довольно простое приложение в стиле производителя / потребителя с несколькими производителями и одним потребителем.

Исследование привело меня к BlockingCollection<T> тому, что является полезным и позволило мне реализовать долгосрочную задачу потребителя, как показано ниже:

 var c1 = Task.Factory.StartNew(() =>
{
    var buffer = new List<int>(BATCH_BUFFER_SIZE);

    foreach (var value in blockingCollection.GetConsumingEnumerable())
    {
        buffer.Add(value);
        if (buffer.Count == BATCH_BUFFER_SIZE)
        {
            ProcessItems(buffer);
            buffer.Clear();
        }
    }
});
 

ProcessItems Функция отправляет буфер в базу данных и работает пакетно. Однако это решение не является оптимальным. В периоды производства baron может пройти некоторое время, пока буфер не будет заполнен, что означает, что база данных устарела.

Более идеальным решением было бы либо запустить задачу по 30-секундному таймеру, либо замкнуть foreach ее с таймаутом.

Я работал с идеей таймера и придумал это:

 syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);

private static void TimerElapsed(object state)
{
    var buffer = new List<int>();
    var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();

    foreach (var value in collection)
    {
        buffer.Add(value);
    }

    ProcessItems(buffer);
    buffer.Clear();
}
 

У этого есть явная проблема, заключающаяся в том, что foreach он будет заблокирован до конца, что противоречит цели таймера.

Кто-нибудь может предложить направление? По сути, мне нужно BlockingCollection периодически делать снимок и обрабатывать содержимое, чтобы очистить его. Возможно, a BlockingCollection — неправильный тип?

Ответ №1:

Вместо использования GetConsumingEnumerable в обратном вызове таймера используйте один из этих методов, добавляя результаты в список, пока он не вернется false , или вы не достигли удовлетворительного размера пакета.

Блокирующая коллекция.Метод TryTake (T) — вероятно, то, что вам нужно, вы вообще не хотите выполнять дальнейшее ожидание.

Блокирующая коллекция.Метод TryTake (T, Int32)

Блокирующая коллекция.Метод TryTake (T, TimeSpan)

Вы можете легко извлечь это в расширение (непроверенное):

 public static IList<T> Flush<T>
(this BlockingCollection<T> collection, int maxSize = int.MaxValue)
{
     // Argument checking.

     T next;
     var result = new List<T>();

     while(result.Count < maxSize amp;amp; collection.TryTake(out next))
     {
         result.Add(next);
     }

     return resu<
}
 

Комментарии:

1. Я пошел с этой реализацией, которая работает отлично. Он запускается каждые x секунд и отправляет только то, что доступно (без ожидания). Спасибо за вашу помощь. gist.github.com/AntSwift/d2faa4f43d1a6d594172

2. Полная рабочая версия теперь доступна по адресу github.com/AntSwift/AS.BlockingCollectionDemo

3. Я считаю, что ваши примеры кода будут подвержены условиям гонки из-за механизма таймера. Если ваш потребитель не может выполнить задачу в течение периода таймера, таймер сработает независимо от того, что приведет к появлению другого потребителя.

4. @Andrew: Это зависит от используемой реализации таймера, которая выходит за рамки этого ответа, который касается только самой коллекции блокировок.