#c# #multithreading #parallel.foreach
#c# #многопоточность #parallel.foreach
Вопрос:
Я работаю над улучшением части своего кода для повышения эффективности. В исходном коде я ограничивал количество разрешенных потоков до 5, и если бы у меня уже было 5 активных потоков, я бы подождал, пока один завершится, прежде чем запускать другой. Теперь я хочу изменить этот код, чтобы разрешить любое количество потоков, но я хочу иметь возможность убедиться, что только 5 потоков запускаются каждую секунду. Например:
- Второй 0 — 5 новых потоков
- Второй 1 — 5 новых потоков
- Вторые 2 — 5 новых потоков …
Исходный код (cleanseDictionary обычно содержит тысячи элементов):
ConcurrentDictionary<long, APIResponse> cleanseDictionary = new ConcurrentDictionary<long, APIResponse>();
ConcurrentBag<int> itemsinsec = new ConcurrentBag<int>();
ConcurrentDictionary<long, string> resourceDictionary = new ConcurrentDictionary<long, string>();
DateTime start = DateTime.Now;
Parallel.ForEach(resourceDictionary, new ParallelOptions { MaxDegreeOfParallelism = 5 }, row =>
{
lock (itemsinsec)
{
ThrottleAPIRequests(itemsinsec, start);
itemsinsec.Add(1);
}
cleanseDictionary.TryAdd(row.Key, _helper.MakeAPIRequest(string.Format("/endpoint?{0}", row.Value)));
});
private static void ThrottleAPIRequests(ConcurrentBag<int> itemsinsec, DateTime start)
{
if ((start - DateTime.Now).Milliseconds < 10001 amp;amp; itemsinsec.Count > 4)
{
System.Threading.Thread.Sleep(1000 - (start - DateTime.Now).Milliseconds);
start = DateTime.Now;
itemsinsec = new ConcurrentBag<int>();
}
}
Моей первой мыслью было увеличить значение MaxDegreeofParallelism
до чего-то намного большего, а затем использовать вспомогательный метод, который будет ограничивать только 5 потоков в секунду, но я не уверен, что это лучший способ сделать это, и если это так, мне, вероятно, понадобится lock
обойти этот шаг?
Заранее спасибо!
РЕДАКТИРОВАТЬ Я на самом деле ищу способ ограничить запросы API, а не фактические потоки. Я думал, что они были одним и тем же.
Редактировать 2: мои требования — отправлять более 5 запросов API каждую секунду
Комментарии:
1.
Parallel.ForEach
не запускает новые потоки. Он использует несколько задач для разделения большого объема данных, и каждая задача работает исключительно с этими данными. Что вы пытаетесь сделать с этим кодом и почему вы пытаетесь «дросселировать», когда у вас максимум 5 одновременных вызовов?2. @PanagiotisKanavos это требование, которое установил хост, в любую секунду было разрешено только 5 (так я думал), но теперь я узнал, что их счетчик сбрасывается после каждого второго прохождения.
3. Похоже, вы пытаетесь ограничить запросы , а не потоки. Каковы ваши фактические требования? Выполнить, например, до 5 одновременных запросов или 5 запросов в секунду?
4. @PanagiotisKanavos да, я думаю, это более понятно — я хочу ограничить запросы, чтобы было только 5 ЗАПРОСОВ в секунду. Хотя я хочу, чтобы было как можно больше запросов
5. Единственные, кому разрешено отклонять этот вопрос, — это те, кому в прошлом приходилось регулировать запросы! Только они понимают, насколько это запутанно, если вы не знаете методов!
Ответ №1:
«Параллельно.ForEach» с веб-сайта MS
может выполняться параллельно
Если вам нужна какая-либо степень точного контроля над тем, как управляются потоки, это не так.
Как насчет создания собственного вспомогательного класса, в котором вы можете ставить задания в очередь с идентификатором группы, который позволяет вам ждать завершения всех заданий с идентификатором группы X, и он порождает дополнительные потоки по мере необходимости?
Комментарии:
1. @PanagiotisKanavos кто знает — при повторном прочтении вопроса внезапно становится менее ясно, в чем на самом деле заключается вопрос.
Ответ №2:
Для меня лучшим решением является:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace SomeNamespace
{
public class RequestLimiter : IRequestLimiter
{
private readonly ConcurrentQueue<DateTime> _requestTimes;
private readonly TimeSpan _timeSpan;
private readonly object _locker = new object();
public RequestLimiter()
{
_timeSpan = TimeSpan.FromSeconds(1);
_requestTimes = new ConcurrentQueue<DateTime>();
}
public TResult Run<TResult>(int requestsOnSecond, Func<TResult> function)
{
WaitUntilRequestCanBeMade(requestsOnSecond).Wait();
return function();
}
private Task WaitUntilRequestCanBeMade(int requestsOnSecond)
{
return Task.Factory.StartNew(() =>
{
while (!TryEnqueueRequest(requestsOnSecond).Result) ;
});
}
private Task SynchronizeQueue()
{
return Task.Factory.StartNew(() =>
{
_requestTimes.TryPeek(out var first);
while (_requestTimes.Count > 0 amp;amp; (first.Add(_timeSpan) < DateTime.UtcNow))
_requestTimes.TryDequeue(out _);
});
}
private Task<bool> TryEnqueueRequest(int requestsOnSecond)
{
lock (_locker)
{
SynchronizeQueue().Wait();
if (_requestTimes.Count < requestsOnSecond)
{
_requestTimes.Enqueue(DateTime.UtcNow);
return Task.FromResult(true);
}
return Task.FromResult(false);
}
}
}
}
Комментарии:
1. Пожалуйста, добавьте некоторые пояснения, а не просто вставляйте код.
2. Зачем для каждого запуска нужен
requestsOnSecond
параметр?
Ответ №3:
Я хочу иметь возможность отправлять более 5 запросов API каждую секунду
Это действительно просто:
while (true) {
await Task.Delay(TimeSpan.FromSeconds(1));
await Task.WhenAll(Enumerable.Range(0, 5).Select(_ => RunRequestAsync()));
}
Возможно, это не лучший подход, поскольку будет поток запросов. Это не является непрерывным.
Кроме того, существует перекос во времени. Одна итерация занимает более 1 секунды. Это можно решить с помощью нескольких строк временной логики.
Комментарии:
1. Поэтому я мог бы просто выполнять цикл, пока есть запросы для отправки. Что вы имеете в виду, что это не является непрерывным?
2. Раз в секунду происходит пакет запросов. Если бы вы хотели отправлять 1 млн запросов один раз в час, это было бы проблемой. При 5 каждые 1 с, вероятно, нет.
3. Обычное количество, которое я получаю, составляет от 500 до 5000.. будет ли это проблемой здесь? Я ищу эффективное решение для получения 5 запросов каждую секунду (или как можно ближе к нему)
4. О каком «количестве» вы говорите?
5. количество запросов API, которые мне нужно отправить