C # — Почему мой TaskScheduler заменяется на стандартный после определенных вызовов await?

#c# #.net #async-await #task-parallel-library #taskscheduler

#c# #.net #async-await #task-parallel-library #taskscheduler

Вопрос:

Я пытаюсь установить пользовательский TaskScheduler , но он не работает для вложенных асинхронных методов. Планировщик таинственным образом меняется на стандартный. Вот код:

 using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace MyApp
{
    internal static class Program
    {
        public static void Main(string[] args)
        {
            var scheduler = new MyTaskScheduler();
            var context = new MySynchronizationContext(scheduler);
            context.Post(Run, null);
            Thread.Sleep(3000);
        }

        private static void Run(object state)
        {
            RunAsync(); // no await here, intentional.
        }

        private static async Task RunAsync()
        {
            Logger.WriteLine("Before InternalRunAsync"); // <-- Scheduler OK.
            await InternalRunAsync();
            Logger.WriteLine("After InternalRunAsync"); // <-- Scheduler changed. WHY???
        }

        private static async Task InternalRunAsync()
        {
            Logger.WriteLine("Before Delay"); // <-- Scheduler OK.
            await Task.Delay(TimeSpan.FromSeconds(1.5));
            Logger.WriteLine("After Delay"); // <-- Scheduler OK.
        }
    }

    internal static class Logger
    {
        private static readonly Stopwatch sw = Stopwatch.StartNew();

        public static void WriteLine(string msg)
        {
            var nativeThreadId = AppDomain.GetCurrentThreadId();
            var threadId = Thread.CurrentThread.ManagedThreadId;
            var syncContext = SynchronizationContext.Current?.ToString() ?? "null";
            var scheduler = TaskScheduler.Current.ToString();
            Console.WriteLine(
                $"[{sw.Elapsed.TotalSeconds} native:{nativeThreadId} managed:{threadId} syncCtx:{syncContext} sch:{scheduler}] {msg}");
        }
    }

    internal sealed class MySynchronizationContext : SynchronizationContext
    {
        private readonly MyTaskScheduler scheduler;

        public MySynchronizationContext(MyTaskScheduler scheduler)
        {
            this.scheduler = scheduler;
        }

        public override SynchronizationContext CreateCopy()
        {
            return new MySynchronizationContext(scheduler);
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            var caller = new SetContextWrapper(this, d, state);
            new Task(caller.Run).Start(scheduler);
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            var caller = new SetContextWrapper(this, d, state);
            new Task(caller.Run).RunSynchronously(scheduler);
        }

        private sealed class SetContextWrapper
        {
            private readonly MySynchronizationContext _context;
            private readonly SendOrPostCallback _callback;
            private readonly object _state;

            public SetContextWrapper(MySynchronizationContext context, SendOrPostCallback callback, object state)
            {
                _context = context;
                _callback = callback;
                _state = state;
            }

            public void Run()
            {
                SynchronizationContext.SetSynchronizationContext(_context);
                _callback(_state);
            }
        }
    }

    internal sealed class MyTaskScheduler : TaskScheduler
    {
        protected override void QueueTask(Task task)
        {
            ThreadPool.QueueUserWorkItem(RunTask, task);
        }

        private void RunTask(object state)
        {
            var task = (Task) state;
            TryExecuteTask(task);
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return TryExecuteTask(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return new List<Task>();
        }
    }
}
  

Вывод:

 [0.0003366 native:13400 managed:3 syncCtx:MyApp.MySynchronizationContext sch:MyApp.MyTaskScheduler] Before InternalRunAsync
[0.0016017 native:13400 managed:3 syncCtx:MyApp.MySynchronizationContext sch:MyApp.MyTaskScheduler] Before Delay
[1.5056582 native:13400 managed:3 syncCtx:MyApp.MySynchronizationContext sch:MyApp.MyTaskScheduler] After Delay
[1.5059042 native:13400 managed:3 syncCtx:MyApp.MySynchronizationContext sch:System.Threading.Tasks.ThreadPoolTaskScheduler] After InternalRunAsync
  

Обратите внимание на последнюю строку. Это показывает, что после того, как метод InternalRunAsync возвращается, планировщик был сброшен на ThreadPoolTaskScheduler . Почему?

Комментарии:

1. TaskScheduler.Current определяется как TaskScheduler выполняемый в данный момент Task . Задача, представляющая async метод, никогда не считается текущей. SynchronizationContext продолжение может обойти SynchronizationContext Post / Send и встроиться в текущий поток, когда он уже находится в цели SynchronizationContext .

2. Я знаю. Но продолжение является дочерним элементом задачи, которая использует правильный планировщик. Продолжение, которое выводит «После задержки», использует правильный планировщик. Тогда нет переключения потоков, и второе продолжение использует другой планировщик. Почему меняется планировщик?

3. Тогда нет переключения потоков , что неверно. Этот поток не принадлежит вам, пока ваша программа ожидает Task.Delay . Это просто совпадение, что продолжение выполняется в том же потоке. Вам просто нужно добавить больше протоколирования, и вы увидите, что задействовано более одного потока.

4. Я имею в виду, что нет переключения потоков между After Delay (который использует правильный планировщик) и After InternalRunAsync (который использует неправильный планировщик).

5. After Delay отправлено через SynchronizationContext , который выполнил продолжение в задаче. After InternalRunAsync встроен, таким образом, его код не принадлежит ни одному Task .