Ограничение количества запросов API, запускаемых в секунду с использованием Parallel.ForEach

#c# #multithreading #parallel.foreach

#c# #многопоточность #parallel.foreach

Вопрос:

Я работаю над улучшением части своего кода для повышения эффективности. В исходном коде я ограничивал количество разрешенных потоков до 5, и если бы у меня уже было 5 активных потоков, я бы подождал, пока один завершится, прежде чем запускать другой. Теперь я хочу изменить этот код, чтобы разрешить любое количество потоков, но я хочу иметь возможность убедиться, что только 5 потоков запускаются каждую секунду. Например:

  • Второй 0 — 5 новых потоков
  • Второй 1 — 5 новых потоков
  • Вторые 2 — 5 новых потоков …

Исходный код (cleanseDictionary обычно содержит тысячи элементов):

         ConcurrentDictionary<long, APIResponse> cleanseDictionary = new ConcurrentDictionary<long, APIResponse>();
        ConcurrentBag<int> itemsinsec = new ConcurrentBag<int>();
        ConcurrentDictionary<long, string> resourceDictionary = new ConcurrentDictionary<long, string>();
        DateTime start = DateTime.Now;

        Parallel.ForEach(resourceDictionary, new ParallelOptions { MaxDegreeOfParallelism = 5 }, row =>
        {
            lock (itemsinsec)
            {
                ThrottleAPIRequests(itemsinsec, start);

                itemsinsec.Add(1);
            }

            cleanseDictionary.TryAdd(row.Key, _helper.MakeAPIRequest(string.Format("/endpoint?{0}", row.Value)));
        });


    private static void ThrottleAPIRequests(ConcurrentBag<int> itemsinsec, DateTime start)
    {
        if ((start - DateTime.Now).Milliseconds < 10001 amp;amp; itemsinsec.Count > 4)
        {
            System.Threading.Thread.Sleep(1000 - (start - DateTime.Now).Milliseconds);
            start = DateTime.Now;
            itemsinsec = new ConcurrentBag<int>();
        }
    }
 

Моей первой мыслью было увеличить значение MaxDegreeofParallelism до чего-то намного большего, а затем использовать вспомогательный метод, который будет ограничивать только 5 потоков в секунду, но я не уверен, что это лучший способ сделать это, и если это так, мне, вероятно, понадобится lock обойти этот шаг?

Заранее спасибо!

РЕДАКТИРОВАТЬ Я на самом деле ищу способ ограничить запросы API, а не фактические потоки. Я думал, что они были одним и тем же.

Редактировать 2: мои требования — отправлять более 5 запросов API каждую секунду

Комментарии:

1. Parallel.ForEach не запускает новые потоки. Он использует несколько задач для разделения большого объема данных, и каждая задача работает исключительно с этими данными. Что вы пытаетесь сделать с этим кодом и почему вы пытаетесь «дросселировать», когда у вас максимум 5 одновременных вызовов?

2. @PanagiotisKanavos это требование, которое установил хост, в любую секунду было разрешено только 5 (так я думал), но теперь я узнал, что их счетчик сбрасывается после каждого второго прохождения.

3. Похоже, вы пытаетесь ограничить запросы , а не потоки. Каковы ваши фактические требования? Выполнить, например, до 5 одновременных запросов или 5 запросов в секунду?

4. @PanagiotisKanavos да, я думаю, это более понятно — я хочу ограничить запросы, чтобы было только 5 ЗАПРОСОВ в секунду. Хотя я хочу, чтобы было как можно больше запросов

5. Единственные, кому разрешено отклонять этот вопрос, — это те, кому в прошлом приходилось регулировать запросы! Только они понимают, насколько это запутанно, если вы не знаете методов!

Ответ №1:

«Параллельно.ForEach» с веб-сайта MS

может выполняться параллельно

Если вам нужна какая-либо степень точного контроля над тем, как управляются потоки, это не так.
Как насчет создания собственного вспомогательного класса, в котором вы можете ставить задания в очередь с идентификатором группы, который позволяет вам ждать завершения всех заданий с идентификатором группы X, и он порождает дополнительные потоки по мере необходимости?

Комментарии:

1. @PanagiotisKanavos кто знает — при повторном прочтении вопроса внезапно становится менее ясно, в чем на самом деле заключается вопрос.

Ответ №2:

Для меня лучшим решением является:

 using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace SomeNamespace
{
    public class RequestLimiter : IRequestLimiter
    {
        private readonly ConcurrentQueue<DateTime> _requestTimes;
        private readonly TimeSpan _timeSpan;

        private readonly object _locker = new object();

        public RequestLimiter()
        {
            _timeSpan = TimeSpan.FromSeconds(1);
            _requestTimes = new ConcurrentQueue<DateTime>();
        }

        public TResult Run<TResult>(int requestsOnSecond, Func<TResult> function)
        {
            WaitUntilRequestCanBeMade(requestsOnSecond).Wait();
            return function();
        }

        private Task WaitUntilRequestCanBeMade(int requestsOnSecond)
        {
            return Task.Factory.StartNew(() =>
            {
                while (!TryEnqueueRequest(requestsOnSecond).Result) ;
            });
        }

        private Task SynchronizeQueue()
        {
            return Task.Factory.StartNew(() =>
            {
                _requestTimes.TryPeek(out var first);

                while (_requestTimes.Count > 0 amp;amp; (first.Add(_timeSpan) < DateTime.UtcNow))
                    _requestTimes.TryDequeue(out _);
            });
        }

        private Task<bool> TryEnqueueRequest(int requestsOnSecond)
        {
            lock (_locker)
            {
                SynchronizeQueue().Wait();
                if (_requestTimes.Count < requestsOnSecond)
                {
                    _requestTimes.Enqueue(DateTime.UtcNow);
                    return Task.FromResult(true);
                }
                return Task.FromResult(false);
            }
        }
    }
}
 

Комментарии:

1. Пожалуйста, добавьте некоторые пояснения, а не просто вставляйте код.

2. Зачем для каждого запуска нужен requestsOnSecond параметр?

Ответ №3:

Я хочу иметь возможность отправлять более 5 запросов API каждую секунду

Это действительно просто:

 while (true) {
 await Task.Delay(TimeSpan.FromSeconds(1));
 await Task.WhenAll(Enumerable.Range(0, 5).Select(_ => RunRequestAsync()));
}
 

Возможно, это не лучший подход, поскольку будет поток запросов. Это не является непрерывным.

Кроме того, существует перекос во времени. Одна итерация занимает более 1 секунды. Это можно решить с помощью нескольких строк временной логики.

Комментарии:

1. Поэтому я мог бы просто выполнять цикл, пока есть запросы для отправки. Что вы имеете в виду, что это не является непрерывным?

2. Раз в секунду происходит пакет запросов. Если бы вы хотели отправлять 1 млн запросов один раз в час, это было бы проблемой. При 5 каждые 1 с, вероятно, нет.

3. Обычное количество, которое я получаю, составляет от 500 до 5000.. будет ли это проблемой здесь? Я ищу эффективное решение для получения 5 запросов каждую секунду (или как можно ближе к нему)

4. О каком «количестве» вы говорите?

5. количество запросов API, которые мне нужно отправить