#.net-core #background-service #tpl-dataflow
#.net-ядро #фоновая служба #tpl-поток данных
Вопрос:
Используя следующий код, я использую фоновую службу .net Core с непрерывной очередью заданий, где внутри ExecuteAsync имитирует задание, добавленное в очередь (это может быть сбор заказа, получение ответов на заказы и т.д.)
Фоновая служба:
public class Worker : BackgroundService
{
public WorkerJobQueue orderQueue { get; set; }
public override async Task StartAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Sales_Order_Processor_Service Starting");
orderQueue = new WorkerJobQueue();
orderQueue.RegisterHandler<TestJob>(TestJobWorker.DoJob);
await base.StartAsync(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Sales_Order_Processor_Service Stopping");
await orderQueue.EndQueue(cancellationToken);
await base.StopAsync(cancellationToken);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var i = 0;
while (!stoppingToken.IsCancellationRequested)
{
Console.ReadLine();
for (var j = 0; j < 50; j )
{
var tmp = new TestJob { JobNumber = i };
Console.WriteLine($"Adding job {tmp.JobNumber} to queue");
await orderQueue.Enqueue(tmp);
i ;
}
Console.WriteLine($"{orderQueue.GetNumberOfRemainingJobs()} Jobs in queue...");
}
}
}
Очередь рабочих заданий:
public class WorkerJobQueue
{
private ActionBlock<IJob> _workerBlock;
public WorkerJobQueue()
{
}
public void RegisterHandler<T>(Action<T> handleAction) where T : IJob
{
Action<IJob> actionWrapper = (job) => handleAction((T)job);
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 5,
};
_workerBlock = new ActionBlock<IJob>((job) => actionWrapper(job), executionDataflowBlockOptions);
}
public async Task Enqueue(IJob job)
{
await _workerBlock.SendAsync(job);
}
public int GetNumberOfRemainingJobs()
{
return _workerBlock.InputCount;
}
public async Task EndQueue(CancellationToken stoppingToken)
{
await Task.WhenAll(_workerBlock.Completion, Task.Delay(Timeout.Infinite, stoppingToken));
}
}
Элемент задания:
public class TestJob : IJob
{
public int JobNumber { get; set; }
}
И просто для имитации некоторой работы, выполняемой заданием:
public class TestJobWorker
{
public static void DoJob(TestJob testJob)
{
var rnd = new Random();
var ranNum = rnd.Next(10);
Console.WriteLine($"Starting job {testJob.JobNumber} sleeping for {ranNum} seconds");
System.Threading.Thread.Sleep(ranNum* 1000);
Console.WriteLine($"Finished job {testJob.JobNumber}");
}
}
Очередь заданий работает так, как она должна добавлять 50 заданий в очередь при нажатии клавиши, однако всякий раз, когда служба останавливается / окно консоли закрывается, оно фактически не ожидает завершения очереди заданий и фактически не переходит в StopAsync
функцию?
Я не понял, что функция StopAsync
вызывается при закрытии / остановке службы? Или логика в моей очереди на самом деле неверна при завершении очереди?
Ответ №1:
Оказывается, StopAsync
вызывается позже, чем размещенный stop
я использовал следующий код для запуска при остановке:
private readonly IHostApplicationLifetime _hostApplicationLifetime;
public Worker(IHostApplicationLifetime hostApplicationLifetime)
{
_hostApplicationLifetime = hostApplicationLifetime;
}
private void OnStopping()
{
orderQueue.EndQueue();
logger.Debug("Sales_Order_Processor_Service Stopping");
}