Как вы запускаете переменное количество параллельных параметризуемых потоков типа бесконечного цикла в C #?

#c# #.net #multithreading #azure #azure-service-fabric

#c# #.net #многопоточность #azure #azure-service-fabric

Вопрос:

Я создаю свое первое многопоточное приложение на основе C # / .NET, которое будет работать в кластере Azure Service Fabric. Как следует из названия, я хочу запустить переменное количество параллельных параметризуемых потоков типа бесконечного цикла, которые будут использовать метод RunAsync.

Каждый дочерний поток выглядит примерно так:

         public async Task childThreadCall(...argument list...)
        {
            while (true)
            {
                try
                {
                    //long running code
                    //do something useful here
                    //sleep for an independently parameterizable period, then wake up and repeat
                }
                catch (Exception e)
                {
                    //Exception Handling
                }
            }
        }
 

Существует переменное количество таких дочерних потоков, которые вызываются в методе RunAsync. Я хочу сделать что-то вроде этого:

         protected override async Task RunAsync(CancellationToken cancellationToken)
        {
            try
            {
                for (int i = 0; i < input.length; i  )
                {
                    ThreadStart ts[i] = new ThreadStart(childThreadCall(...argument list...));
                    Thread tc[i] = new Thread(ts);
                    tc[i].Start();
                }
            }
            catch (Exception e)
            {
                //Exception Handling
            }
        }

 

Таким образом, в основном каждый из дочерних потоков выполняется независимо от других и продолжает делать это вечно. Возможно ли это сделать? Может ли кто-нибудь указать мне правильное направление? Есть ли какие-либо подводные камни, о которых следует знать?

Комментарии:

1. Я бы держался подальше от класса Thread, если у вас нет конкретного варианта использования для него

2. Как долго это долго? О какой работе мы говорим, связан ли она с вводом-выводом?

3. @PeterBons Работа каждого независимого потока в основном заключается в запросе кластера Azure Data Explorer, ожидании результатов, анализе результатов и сохранении их в хранилище больших двоичных объектов Azure. Затем ожидание заданного интервала (скажем, 1-2 часа), а затем перезапуск этого процесса. Это будет продолжаться вечно, пока приложение не будет развернуто в кластере Service Fabric. Обратите внимание, что период ожидания для каждого потока устанавливается независимо.

4. @PeterBons Обратите внимание, что мне не нужно синхронизировать для завершения потока. Каждый может выполнять свою работу после правильного запуска.

Ответ №1:

RunAsync Метод вызывается при запуске службы. Так что да, его можно использовать для выполнения того, что вы хотите. Я предлагаю использовать задачи, поскольку они хорошо сочетаются с токеном отмены. Вот черновик:

 protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var tasks = new List<Task>();
    try
    {
        for (int i = 0; i < input.length; i  )
        {
            tasks.Add(MyTask(cancellationToken, i);
        }
        
        await Task.WhenAll(tasks);
    }
    catch (Exception e)
    {
        //Exception Handling
    }
}

public async Task MyTask(CancellationToken cancellationToken, int a)
{
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        try
        {
            //long running code, if possible check for cancellation using the token
            //do something useful here
            await SomeUseFullTask(cancellationToken);
            
            //sleep for an independently parameterizable period, then wake up and repeat
            await Task.Delay(TimeSpan.FromHours(1), cancellationToken);
        }
        catch (Exception e)
        {
            //Exception Handling
        }
    }
}
 

Что касается подводных камней, есть хороший список вещей, о которых нужно думать в целом при использовании задач.

Имейте в виду, что задачи лучше всего подходят для работы с вводом-выводом. Если вы можете опубликовать, что именно делается в длительном процессе, пожалуйста, сделайте это, тогда я, возможно, смогу улучшить ответ, чтобы он наилучшим образом соответствовал вашему варианту использования.

Важно учитывать токен отмены, переданный методу RunAsync, поскольку он указывает, что служба собирается остановиться. Это дает вам возможность изящно остановить вашу работу. Из документов:

Убедитесь, что CancellationToken, переданный RunAsync (CancellationToken), соблюден, и как только он был сигнализирован, RunAsync (CancellationToken) завершается корректно как можно скорее. Пожалуйста, обратите внимание, что если RunAsync(CancellationToken) завершил свою предполагаемую работу, ему не нужно ждать сигнала CancellationToken и он может корректно вернуться.

Как вы можете видеть в моем коде, я передаю CancellationToken дочерним методам, чтобы они могли реагировать на возможную отмену. В вашем случае произойдет отмена из-за бесконечного цикла.

Комментарии:

1. Следует отметить, что объединение while (!cancellationToken.IsCancellationRequested) с await Task.Delay(..., cancellationToken) может привести к непоследовательному поведению отмены. Когда токен отменяется, результат может быть либо исключительным, либо нет. Мое мнение таково, что IsCancellationRequested свойство следует использовать редко, если вообще когда-либо. ThrowIfCancellationRequested Вместо этого следует использовать метод, чтобы гарантировать, что отмена всегда является исключительной.

2. @TheodorZoulias да, вы правы, лучше бросить. Обновил мой ответ.

3. Если вы хотите убедиться, что каждый из MyTask экземпляров запускается как можно скорее, вы могли бы рассмотреть возможность ввода await Task.Yield() начала, иначе выполнение будет выполняться синхронно до тех пор, пока не будет выполнен первый асинхронный вызов.

4. @DanielLerps await Task.Yield() делает то, что вы хотите, только при отсутствии контекста синхронизации. И это происходит случайно, потому что он специально разработан для использования в средах с установленным контекстом синхронизации. По этой причине я считаю это взломом, и я предлагаю вместо этого использовать Task.Run метод, который не зависит от контекста механизм для разгрузки работы на ThreadPool .

5. @PeterBons Спасибо за этот ответ. Это очень близко к тому, что мне нужно. Однако у меня есть следующий вопрос: что, если, скажем, у меня есть 20 задач, но есть ограничение, скажем, 5 на параллелизм SomeUseFullTask() . То есть SomeUseFullTask() очень сильно нагружает некоторые ограниченные ресурсы вычислений / ввода-вывода, и поэтому его нужно запускать только тогда, когда он может получить один из 5 токенов. Если токенов нет, он ожидает, пока токен не будет доступен. Возможно ли реализовать такую вещь?