Ожидаемое автособытие

#c# #multithreading #async-await #concurrency #thread-safety

Вопрос:

Каким был бы асинхронный (ожидаемый) эквивалент автособытия?

Если бы в классической синхронизации потоков мы использовали бы что-то подобное:

     AutoResetEvent signal = new AutoResetEvent(false);

    void Thread1Proc()
    {
        //do some stuff
        //..
        //..

        signal.WaitOne(); //wait for an outer thread to signal we are good to continue

        //do some more stuff
        //..
        //..
    }

    void Thread2Proc()
    {
        //do some stuff
        //..
        //..

        signal.Set(); //signal the other thread it's good to go

        //do some more stuff
        //..
        //..
    }
 

Я надеялся, что в новом асинхронном способе действий произойдет что-то подобное:

 SomeAsyncAutoResetEvent asyncSignal = new SomeAsyncAutoResetEvent();

async void Task1Proc()
{
    //do some stuff
    //..
    //..

    await asyncSignal.WaitOne(); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..
}

async void Task2Proc()
{
    //do some stuff
    //..
    //..

    asyncSignal.Set(); //signal the other thread it's good to go

    //do some more stuff
    //..
    //..
}
 

Я видел другие индивидуальные решения, но то, что мне удалось заполучить в свои руки, в какой-то момент времени все еще включает блокировку потока. Я не хочу этого только ради использования нового синтаксиса ожидания. Я ищу истинный ожидаемый сигнальный механизм, который не блокирует ни один поток.

Это что-то, чего мне не хватает в Параллельной библиотеке задач?

РЕДАКТИРОВАТЬ: Просто для ясности: SomeAsyncAutoResetEvent-это полностью составное имя класса, используемое в качестве заполнителя в моем примере.

Комментарии:

1. Для одноразового использования, TaskCompletionSource результат которого игнорируется ожидающей задачей.

2. gist.github.com/AArnott/1084951 может быть?

3. @MatthewWatson Я вижу, что он использует блокировку, которая блокирует поток из пула потоков. Я надеялся на что-то, не связанное с заблокированной нитью.

4. Блокировка не обязательно означает, что поток заблокирован.

5. @DarkFalcon Правда. И в этом случае он может даже не блокировать ни один поток.

Ответ №1:

Если вы хотите построить свой собственный, у Стивена Туба есть окончательное сообщение в блоге на эту тему.

Если вы хотите использовать уже написанный, у меня есть один в моей библиотеке AsyncEx. АФАИК, на момент написания этой книги другого выхода нет.

Комментарии:

1. Почему бы не new SemaphoreSlim(1) работать, WaitOne() есть WaitAsync() и Set() становится Release()

2. AREs и семафоры очень похожи (хотя обычно используются по-разному). Семантическое различие возникает, если примитив сигнализируется, когда он уже установлен.

3. @AshleyJackson: Этот подход действительно использует другую тему. Некоторые примитивы синхронизации этого не допускают (например, Mutex , Monitor ), но поскольку это an AutoResetEvent , это должно работать.

4. Я думаю, что те, кого зовут «Стивен», рождены для чего угодно.

5. Пост Стивена Тубса, похоже, перенесли сюда

Ответ №2:

Вот источник для Стивена Туба AsyncAutoResetEvent , на случай, если его блог отключится.

 public class AsyncAutoResetEvent
{
    private static readonly Task s_completed = Task.FromResult(true);
    private readonly Queue<TaskCompletionSource<bool>> m_waits = new Queue<TaskCompletionSource<bool>>();
    private bool m_signaled;

    public Task WaitAsync()
    {
        lock (m_waits)
        {
            if (m_signaled)
            {
                m_signaled = false;
                return s_completed;
            }
            else
            {
                var tcs = new TaskCompletionSource<bool>();
                m_waits.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }

    public void Set()
    {
        TaskCompletionSource<bool> toRelease = null;

        lock (m_waits)
        {
            if (m_waits.Count > 0)
                toRelease = m_waits.Dequeue();
            else if (!m_signaled)
                m_signaled = true;
        }

        toRelease?.SetResult(true);
    }
}
 

Комментарии:

1. Почему вы можете использовать обычную блокировку в ожидаемом коде? Не может ли та же задача продолжаться здесь в другом потоке и обойти блокировку?

2. @user1713059 обратите внимание, что WaitAsync на самом деле это не async метод. Это означает, что он не дает контроля на полпути к обработке. Вместо этого он получает a Task от TaskCompletionSource и возвращает его, прежде чем отпустить блокировку.

3. Ах, конечно, так что даже если я сделаю «wait WaitAsync ()», то уверен, что весь метод выполняется одним и тем же потоком, потому что на самом деле это не асинхронно — верно? Суффикс метода «Асинхронный» ввел меня в заблуждение, но из того, что я вижу, он также используется в методах без ключевого слова «асинхронный».

4. Это все еще асинхронный метод, потому что он возвращает задачу, которая может не быть завершена к моменту возврата метода. Однако метод не async является таковым , что означает, что метод не будет работать в какой-то момент в своем теле, пока он await завершается каким-то другим Task . Это соглашение для методов, которые возвращают Task (или Task<T> ), чтобы иметь Async суффикс.

5. Что касается вашего первоначального комментария, блокировка снимается до Task того, как сообщение возвращается вызывающему абоненту, поэтому у этого вызывающего абонента нет возможности обойти блокировку.

Ответ №3:

Я думаю, что на MSDN есть хороший пример: https://msdn.microsoft.com/en-us/library/hh873178(v=vs.110).aspx#WHToTap

 public static Task WaitOneAsync(this WaitHandle waitHandle)
{
    if (waitHandle == null) 
        throw new ArgumentNullException("waitHandle");

    var tcs = new TaskCompletionSource<bool>();
    var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, 
        delegate { tcs.TrySetResult(true); }, null, -1, true);
    var t = tcs.Task;
    t.ContinueWith( (antecedent) => rwh.Unregister(null));
    return t;
}
 

Комментарии:

1. Определенно лучший ответ.

Ответ №4:

Вот версия, которую я подготовил, которая позволяет вам указать время ожидания. Он получен из решения Стивена Туба. В настоящее время мы используем это в производственных рабочих нагрузках.

 public class AsyncAutoResetEvent
{
    readonly LinkedList<TaskCompletionSource<bool>> waiters = 
        new LinkedList<TaskCompletionSource<bool>>();

    bool isSignaled;

    public AsyncAutoResetEvent(bool signaled)
    {
        this.isSignaled = signaled;
    }

    public Task<bool> WaitAsync(TimeSpan timeout)
    {
        return this.WaitAsync(timeout, CancellationToken.None);
    }

    public async Task<bool> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
    {
        TaskCompletionSource<bool> tcs;

        lock (this.waiters)
        {
            if (this.isSignaled)
            {
                this.isSignaled = false;
                return true;
            }
            else if (timeout == TimeSpan.Zero)
            {
                return this.isSignaled;
            }
            else
            {
                tcs = new TaskCompletionSource<bool>();
                this.waiters.AddLast(tcs);
            }
        }

        Task winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken));
        if (winner == tcs.Task)
        {
            // The task was signaled.
            return true;
        }
        else
        {
            // We timed-out; remove our reference to the task.
            // This is an O(n) operation since waiters is a LinkedList<T>.
            lock (this.waiters)
            {
                bool removed = this.waiters.Remove(tcs);
                Debug.Assert(removed);
                return false;
            }
        }
    }

    public void Set()
    {
        lock (this.waiters)
        {
            if (this.waiters.Count > 0)
            {
                // Signal the first task in the waiters list. This must be done on a new
                // thread to avoid stack-dives and situations where we try to complete the
                // same result multiple times.
                TaskCompletionSource<bool> tcs = this.waiters.First.Value;
                Task.Run(() => tcs.SetResult(true));
                this.waiters.RemoveFirst();
            }
            else if (!this.isSignaled)
            {
                // No tasks are pending
                this.isSignaled = true;
            }
        }
    }

    public override string ToString()
    {
        return $"Signaled: {this.isSignaled.ToString()}, Waiters: {this.waiters.Count.ToString()}";
    }
}
 

Комментарии:

1. Я думаю, что это.официанты должны быть заблокированы на пути манипуляции с удалением(tcs)?

2. @HelloSam Я думаю, что ты прав! Исправлено. Спасибо, что указали на это.

3. У меня не так много времени, чтобы отладить это, но будьте предупреждены: я становлюсь тупиковым, используя это. Когда новый поток вызывает event.Set(), он зависает на toRelease.SetResult(true);

4. @Andy спасибо за комментарий. Существует дополнительное исправление, которое я сделал с тех пор, как я первоначально опубликовал это, которое, как я подозреваю, устраняет вашу тупиковую ситуацию (в моем случае это было исключение StackOverflowException). Решение состояло в том, чтобы завернуть SetResult(true) вызов в Task.Run(...) «а».

5. Я ошибаюсь или это не автоматический сброс, после которого он возвращает значение true if (winner == tcs.Task) ?

Ответ №5:

Это также работает, но таким образом может исчезнуть цель использования async и. await

 AutoResetEvent asyncSignal = new AutoResetEvent();

async void Task1Proc()
{
    //do some stuff
    //..
    //..

    await Task.Run(() => asyncSignal.WaitOne()); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..
}
 

Комментарии:

1. Почему это считается плохим?

2. @YarekT Я вспомнил причину в то время, когда писал этот ответ несколько месяцев назад, но не сейчас. Я не думаю, что это плохо, хотя в этом есть более чем одна проблема с переключением контекста (по ключевому слову WaitOne() и по ключевому слову await).

3. Не беспокойтесь. В последнее время я больше изучаю задачи на C#. Из того, что я могу собрать, это плохо, потому что он тратит поток впустую, создавая его, а затем немедленно блокирует его ожиданием. Я видел несколько плавающих решений, которые позволяют избежать этого, каким-то образом используя таймер, но все они кажутся очень сложными. В любом случае, вот голос за

Ответ №6:

Я расширил пример из MSDN, предоставленный Олегом Гордеевым, с дополнительным временем ожидания (мс):

 public static Task WaitOneAsync(this WaitHandle waitHandle, double timeout = 0)
        {
            if (waitHandle == null) throw new ArgumentNullException("waitHandle");

            var tcs = new TaskCompletionSource<bool>();

            if (timeout > 0) 
            {
                var timer = new System.Timers.Timer(timeout) 
                { Enabled = true, AutoReset = false };

                ElapsedEventHandler del = defau<
                del = delegate (object x, System.Timers.ElapsedEventArgs y)
                {
                    tcs.TrySetResult(true);
                    timer.Elapsed -= del; 
                    timer.Dispose();
                };

                timer.Elapsed  = del;
            }
        
            var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
                      delegate { tcs.TrySetResult(true); },
                      null, -1, true);

            var t = tcs.Task;
            t.ContinueWith((antecedent) => rwh.Unregister(null));

            return t;
        }
 

Ответ №7:

Вот моя ПОЛНАЯ реализация SemaphoreSlim с использованием всех SemaphoreSlim.WaitAsync переопределений:

 using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents an event that, when signaled, resets automatically after releasing a single waiting task.
/// </summary>
public sealed class AutoResetEventAsync : IDisposable {

    /// <summary>
    /// Waits asynchronously until a signal is received.
    /// </summary>
    /// <returns>Task completed when the event is signaled.</returns>
    public async ValueTask WaitAsync() {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync();
        lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(millisecondsTimeout);
        lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(millisecondsTimeout, cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the token is cancelled.
    /// </summary>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled or the token is cancelled.</returns>
    public async ValueTask WaitAsync(CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(timeout);
        lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(timeout, cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Sets the state of the event to signaled, allowing one or more waiting tasks to proceed.
    /// </summary>
    public void Set() {
        SemaphoreSlim? toRelease = null;
        lock (Q) {
            if (Q.Count > 0) toRelease = Q.Dequeue();
            else if (!IsSignaled) IsSignaled = true;
        }
        toRelease?.Release();
    }

    /// <summary>
    /// Sets the state of the event to non nonsignaled, making the waiting tasks to wait.
    /// </summary>
    public void Reset() => IsSignaled = false;

    /// <summary>
    /// Disposes any semaphores left in the queue.
    /// </summary>
    public void Dispose() {
        lock (Q) {
            while (Q.Count > 0) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Checks the <see cref="IsSignaled"/> state and resets it when it's signaled.
    /// </summary>
    /// <returns>True if the event was in signaled state.</returns>
    private bool CheckSignaled() {
        lock (Q) {
            if (IsSignaled) {
                IsSignaled = false;
                return true;
            }
            return false;
        }
    }

    private readonly Queue<SemaphoreSlim> Q = new();
    private volatile bool IsSignaled;

}
 

Я использовал SemaphoreSlim , потому что это дает поддержку токенов тайм-аута и отмены «бесплатно». Было бы еще лучше, если бы я просто изменил оригинал .ЧИСТЫЙ исходный код SemaphoreSlim , чтобы вести себя AutoResetEvent так, но нет, вот и все. Дайте мне знать, если обнаружите какие-либо ошибки.

Комментарии:

1. Является AutoResetEventAsync ли класс потокобезопасным? Если да, то что может произойти, если два потока вызовут WaitAsync() одновременно? Разве не возможно , что оба будут читать IsSignaled поле как true , прежде чем кто-либо из них выполнит IsSignaled = false; строку? Также if (Q.Contains(s)) Q.Dequeue().Dispose(); строка ищет, s существует ли он в очереди, а затем снимает очередь и размещает какой-либо другой семафор (скорее всего). Это намеренно?

2. @Теодорзулиас : Да, потому что даже если 2 потока могут войти WaitAsync одновременно, они не смогут передать Q доступ одновременно. Обратите внимание, что Q доступ к нему возможен только с помощью одного потока. Это делает поток простым и прямым. Это также означает, что внутреннее ожидание доступно только для одного потока. Таким образом, невозможно, чтобы недействительный семафор был отключен. Несколько тестов, которые я провел для этого класса, еще не провалились, но это не доказывает, что это действительно. Я думаю, что доступ к Q с одним протектором делает это.

3. Я говорю об этой строке: if (IsSignaled) { IsSignaled = false; return; } . Это не защищено замком. IsSignaled Это даже не volatile поле. Что касается if (Q.Contains(s)) , то, если вы уверены, что код s может быть только во главе очереди, if (Q.Peak() == s) было бы быстрее и выразительнее в отношении намерений кода. Кстати, что произойдет, если cancellationToken отменят и WaitAsync броски?

4. Вы нашли несколько интересных крайних случаев. Я постараюсь исправить их и отредактировать свой пример… БРБ.

5. Спасибо за понимание, тогда я буду использовать лучшую версию с вашим расширением. В любом случае, это того стоило в качестве учебного опыта. Ты лучший игрок.

Ответ №8:

Вот моя версия одноразового события, которого могут ожидать несколько потоков. Он внутренне полагается на BoundedChannel .

 public class AsyncOneTimeEvent<T>
{
    private T Result { get; set; }

    private readonly Channel<bool> _channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(1)
    {
        SingleReader = false,
        SingleWriter = true,
        FullMode = BoundedChannelFullMode.DropWrite,
    });

    public async Task<T> GetResult()
    {
        await _channel.Reader.WaitToReadAsync().ConfigureAwait(false);

        return this.Resu<
    }

    public void SetResult(T result)
    {
        this.Result = resu<
        _channel.Writer.Complete();
    }

    public void SetError(Exception ex)
    {
        _channel.Writer.Complete(ex);
    }
}
 

Комментарии:

1. Использование а Channel в качестве замены а TaskCompletionSource кажется разумной идеей. Но в этом также нет необходимости, и реализация, по-видимому, подвержена проблемам видимости. Я не уверен, что все потоки будут «видеть» последнее значение энергонезависимого private T Result поля во всех случаях.

2. Пример: Поток A вводит GetResult() метод, считывает значение Result не по порядку, а затем приостанавливается операционной системой. Поток B входит SetResult в метод и выходит из него. Поток A возобновляется, синхронно выполняет await _channel.Reader.WaitToReadAsync() строку и возвращает Task значение, имеющее default(T) значение. Является ли этот сценарий невозможным на основе спецификации C# ECMA-334? Я понятия не имею!

3. @TheodorZoulias конечно, это работает, вы можете попробовать это онлайн : dotnetfiddle.net/uyQRG1

4. Я уверен, что это работает. Я не уверен, что он гарантированно будет корректно работать на всех архитектурах процессоров. Проблемы с видимостью, как известно, трудно отлаживать. Вы можете прочитать эту статью Игоря Островского, чтобы понять, почему.