#c# #multithreading #async-await #concurrency #thread-safety
Вопрос:
Каким был бы асинхронный (ожидаемый) эквивалент автособытия?
Если бы в классической синхронизации потоков мы использовали бы что-то подобное:
AutoResetEvent signal = new AutoResetEvent(false);
void Thread1Proc()
{
//do some stuff
//..
//..
signal.WaitOne(); //wait for an outer thread to signal we are good to continue
//do some more stuff
//..
//..
}
void Thread2Proc()
{
//do some stuff
//..
//..
signal.Set(); //signal the other thread it's good to go
//do some more stuff
//..
//..
}
Я надеялся, что в новом асинхронном способе действий произойдет что-то подобное:
SomeAsyncAutoResetEvent asyncSignal = new SomeAsyncAutoResetEvent();
async void Task1Proc()
{
//do some stuff
//..
//..
await asyncSignal.WaitOne(); //wait for an outer thread to signal we are good to continue
//do some more stuff
//..
//..
}
async void Task2Proc()
{
//do some stuff
//..
//..
asyncSignal.Set(); //signal the other thread it's good to go
//do some more stuff
//..
//..
}
Я видел другие индивидуальные решения, но то, что мне удалось заполучить в свои руки, в какой-то момент времени все еще включает блокировку потока. Я не хочу этого только ради использования нового синтаксиса ожидания. Я ищу истинный ожидаемый сигнальный механизм, который не блокирует ни один поток.
Это что-то, чего мне не хватает в Параллельной библиотеке задач?
РЕДАКТИРОВАТЬ: Просто для ясности: SomeAsyncAutoResetEvent-это полностью составное имя класса, используемое в качестве заполнителя в моем примере.
Комментарии:
1. Для одноразового использования,
TaskCompletionSource
результат которого игнорируется ожидающей задачей.2. gist.github.com/AArnott/1084951 может быть?
3. @MatthewWatson Я вижу, что он использует блокировку, которая блокирует поток из пула потоков. Я надеялся на что-то, не связанное с заблокированной нитью.
4. Блокировка не обязательно означает, что поток заблокирован.
5. @DarkFalcon Правда. И в этом случае он может даже не блокировать ни один поток.
Ответ №1:
Если вы хотите построить свой собственный, у Стивена Туба есть окончательное сообщение в блоге на эту тему.
Если вы хотите использовать уже написанный, у меня есть один в моей библиотеке AsyncEx. АФАИК, на момент написания этой книги другого выхода нет.
Комментарии:
1. Почему бы не
new SemaphoreSlim(1)
работать,WaitOne()
естьWaitAsync()
иSet()
становитсяRelease()
2. AREs и семафоры очень похожи (хотя обычно используются по-разному). Семантическое различие возникает, если примитив сигнализируется, когда он уже установлен.
3. @AshleyJackson: Этот подход действительно использует другую тему. Некоторые примитивы синхронизации этого не допускают (например,
Mutex
,Monitor
), но поскольку это anAutoResetEvent
, это должно работать.4. Я думаю, что те, кого зовут «Стивен», рождены для чего угодно.
5. Пост Стивена Тубса, похоже, перенесли сюда
Ответ №2:
Вот источник для Стивена Туба AsyncAutoResetEvent
, на случай, если его блог отключится.
public class AsyncAutoResetEvent
{
private static readonly Task s_completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> m_waits = new Queue<TaskCompletionSource<bool>>();
private bool m_signaled;
public Task WaitAsync()
{
lock (m_waits)
{
if (m_signaled)
{
m_signaled = false;
return s_completed;
}
else
{
var tcs = new TaskCompletionSource<bool>();
m_waits.Enqueue(tcs);
return tcs.Task;
}
}
}
public void Set()
{
TaskCompletionSource<bool> toRelease = null;
lock (m_waits)
{
if (m_waits.Count > 0)
toRelease = m_waits.Dequeue();
else if (!m_signaled)
m_signaled = true;
}
toRelease?.SetResult(true);
}
}
Комментарии:
1. Почему вы можете использовать обычную блокировку в ожидаемом коде? Не может ли та же задача продолжаться здесь в другом потоке и обойти блокировку?
2. @user1713059 обратите внимание, что
WaitAsync
на самом деле это неasync
метод. Это означает, что он не дает контроля на полпути к обработке. Вместо этого он получает aTask
отTaskCompletionSource
и возвращает его, прежде чем отпустить блокировку.3. Ах, конечно, так что даже если я сделаю «wait WaitAsync ()», то уверен, что весь метод выполняется одним и тем же потоком, потому что на самом деле это не асинхронно — верно? Суффикс метода «Асинхронный» ввел меня в заблуждение, но из того, что я вижу, он также используется в методах без ключевого слова «асинхронный».
4. Это все еще асинхронный метод, потому что он возвращает задачу, которая может не быть завершена к моменту возврата метода. Однако метод не
async
является таковым , что означает, что метод не будет работать в какой-то момент в своем теле, пока онawait
завершается каким-то другимTask
. Это соглашение для методов, которые возвращаютTask
(илиTask<T>
), чтобы иметьAsync
суффикс.5. Что касается вашего первоначального комментария, блокировка снимается до
Task
того, как сообщение возвращается вызывающему абоненту, поэтому у этого вызывающего абонента нет возможности обойти блокировку.
Ответ №3:
Я думаю, что на MSDN есть хороший пример: https://msdn.microsoft.com/en-us/library/hh873178(v=vs.110).aspx#WHToTap
public static Task WaitOneAsync(this WaitHandle waitHandle)
{
if (waitHandle == null)
throw new ArgumentNullException("waitHandle");
var tcs = new TaskCompletionSource<bool>();
var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
delegate { tcs.TrySetResult(true); }, null, -1, true);
var t = tcs.Task;
t.ContinueWith( (antecedent) => rwh.Unregister(null));
return t;
}
Комментарии:
1. Определенно лучший ответ.
Ответ №4:
Вот версия, которую я подготовил, которая позволяет вам указать время ожидания. Он получен из решения Стивена Туба. В настоящее время мы используем это в производственных рабочих нагрузках.
public class AsyncAutoResetEvent
{
readonly LinkedList<TaskCompletionSource<bool>> waiters =
new LinkedList<TaskCompletionSource<bool>>();
bool isSignaled;
public AsyncAutoResetEvent(bool signaled)
{
this.isSignaled = signaled;
}
public Task<bool> WaitAsync(TimeSpan timeout)
{
return this.WaitAsync(timeout, CancellationToken.None);
}
public async Task<bool> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
TaskCompletionSource<bool> tcs;
lock (this.waiters)
{
if (this.isSignaled)
{
this.isSignaled = false;
return true;
}
else if (timeout == TimeSpan.Zero)
{
return this.isSignaled;
}
else
{
tcs = new TaskCompletionSource<bool>();
this.waiters.AddLast(tcs);
}
}
Task winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken));
if (winner == tcs.Task)
{
// The task was signaled.
return true;
}
else
{
// We timed-out; remove our reference to the task.
// This is an O(n) operation since waiters is a LinkedList<T>.
lock (this.waiters)
{
bool removed = this.waiters.Remove(tcs);
Debug.Assert(removed);
return false;
}
}
}
public void Set()
{
lock (this.waiters)
{
if (this.waiters.Count > 0)
{
// Signal the first task in the waiters list. This must be done on a new
// thread to avoid stack-dives and situations where we try to complete the
// same result multiple times.
TaskCompletionSource<bool> tcs = this.waiters.First.Value;
Task.Run(() => tcs.SetResult(true));
this.waiters.RemoveFirst();
}
else if (!this.isSignaled)
{
// No tasks are pending
this.isSignaled = true;
}
}
}
public override string ToString()
{
return $"Signaled: {this.isSignaled.ToString()}, Waiters: {this.waiters.Count.ToString()}";
}
}
Комментарии:
1. Я думаю, что это.официанты должны быть заблокированы на пути манипуляции с удалением(tcs)?
2. @HelloSam Я думаю, что ты прав! Исправлено. Спасибо, что указали на это.
3. У меня не так много времени, чтобы отладить это, но будьте предупреждены: я становлюсь тупиковым, используя это. Когда новый поток вызывает event.Set(), он зависает на
toRelease.SetResult(true);
4. @Andy спасибо за комментарий. Существует дополнительное исправление, которое я сделал с тех пор, как я первоначально опубликовал это, которое, как я подозреваю, устраняет вашу тупиковую ситуацию (в моем случае это было исключение StackOverflowException). Решение состояло в том, чтобы завернуть
SetResult(true)
вызов вTask.Run(...)
«а».5. Я ошибаюсь или это не автоматический сброс, после которого он возвращает значение true
if (winner == tcs.Task)
?
Ответ №5:
Это также работает, но таким образом может исчезнуть цель использования async
и. await
AutoResetEvent asyncSignal = new AutoResetEvent();
async void Task1Proc()
{
//do some stuff
//..
//..
await Task.Run(() => asyncSignal.WaitOne()); //wait for an outer thread to signal we are good to continue
//do some more stuff
//..
//..
}
Комментарии:
1. Почему это считается плохим?
2. @YarekT Я вспомнил причину в то время, когда писал этот ответ несколько месяцев назад, но не сейчас. Я не думаю, что это плохо, хотя в этом есть более чем одна проблема с переключением контекста (по ключевому слову WaitOne() и по ключевому слову await).
3. Не беспокойтесь. В последнее время я больше изучаю задачи на C#. Из того, что я могу собрать, это плохо, потому что он тратит поток впустую, создавая его, а затем немедленно блокирует его ожиданием. Я видел несколько плавающих решений, которые позволяют избежать этого, каким-то образом используя таймер, но все они кажутся очень сложными. В любом случае, вот голос за
Ответ №6:
Я расширил пример из MSDN, предоставленный Олегом Гордеевым, с дополнительным временем ожидания (мс):
public static Task WaitOneAsync(this WaitHandle waitHandle, double timeout = 0)
{
if (waitHandle == null) throw new ArgumentNullException("waitHandle");
var tcs = new TaskCompletionSource<bool>();
if (timeout > 0)
{
var timer = new System.Timers.Timer(timeout)
{ Enabled = true, AutoReset = false };
ElapsedEventHandler del = defau<
del = delegate (object x, System.Timers.ElapsedEventArgs y)
{
tcs.TrySetResult(true);
timer.Elapsed -= del;
timer.Dispose();
};
timer.Elapsed = del;
}
var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
delegate { tcs.TrySetResult(true); },
null, -1, true);
var t = tcs.Task;
t.ContinueWith((antecedent) => rwh.Unregister(null));
return t;
}
Ответ №7:
Вот моя ПОЛНАЯ реализация SemaphoreSlim
с использованием всех SemaphoreSlim.WaitAsync
переопределений:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Represents an event that, when signaled, resets automatically after releasing a single waiting task.
/// </summary>
public sealed class AutoResetEventAsync : IDisposable {
/// <summary>
/// Waits asynchronously until a signal is received.
/// </summary>
/// <returns>Task completed when the event is signaled.</returns>
public async ValueTask WaitAsync() {
if (CheckSignaled()) return;
SemaphoreSlim s;
lock (Q) Q.Enqueue(s = new(0, 1));
await s.WaitAsync();
lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
}
/// <summary>
/// Waits asynchronously until a signal is received or the time runs out.
/// </summary>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
/// (-1) to wait indefinitely, or zero to return immediately.</param>
/// <returns>Task completed when the event is signaled or the time runs out.</returns>
public async ValueTask WaitAsync(int millisecondsTimeout) {
if (CheckSignaled()) return;
SemaphoreSlim s;
lock (Q) Q.Enqueue(s = new(0, 1));
await s.WaitAsync(millisecondsTimeout);
lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
}
/// <summary>
/// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
/// </summary>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
/// (-1) to wait indefinitely, or zero to return immediately.</param>
/// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
/// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
public async ValueTask WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) {
if (CheckSignaled()) return;
SemaphoreSlim s;
lock (Q) Q.Enqueue(s = new(0, 1));
try {
await s.WaitAsync(millisecondsTimeout, cancellationToken);
}
finally {
lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
}
}
/// <summary>
/// Waits asynchronously until a signal is received or the token is cancelled.
/// </summary>
/// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
/// <returns>Task completed when the event is signaled or the token is cancelled.</returns>
public async ValueTask WaitAsync(CancellationToken cancellationToken) {
if (CheckSignaled()) return;
SemaphoreSlim s;
lock (Q) Q.Enqueue(s = new(0, 1));
try {
await s.WaitAsync(cancellationToken);
}
finally {
lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
}
}
/// <summary>
/// Waits asynchronously until a signal is received or the time runs out.
/// </summary>
/// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
/// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
/// that represents 0 milliseconds to return immediately.</param>
/// <returns>Task completed when the event is signaled or the time runs out.</returns>
public async ValueTask WaitAsync(TimeSpan timeout) {
if (CheckSignaled()) return;
SemaphoreSlim s;
lock (Q) Q.Enqueue(s = new(0, 1));
await s.WaitAsync(timeout);
lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
}
/// <summary>
/// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
/// </summary>
/// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
/// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
/// that represents 0 milliseconds to return immediately.</param>
/// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
/// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
public async ValueTask WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) {
if (CheckSignaled()) return;
SemaphoreSlim s;
lock (Q) Q.Enqueue(s = new(0, 1));
try {
await s.WaitAsync(timeout, cancellationToken);
}
finally {
lock (Q) if (Q.Count > 0 amp;amp; Q.Peek() == s) Q.Dequeue().Dispose();
}
}
/// <summary>
/// Sets the state of the event to signaled, allowing one or more waiting tasks to proceed.
/// </summary>
public void Set() {
SemaphoreSlim? toRelease = null;
lock (Q) {
if (Q.Count > 0) toRelease = Q.Dequeue();
else if (!IsSignaled) IsSignaled = true;
}
toRelease?.Release();
}
/// <summary>
/// Sets the state of the event to non nonsignaled, making the waiting tasks to wait.
/// </summary>
public void Reset() => IsSignaled = false;
/// <summary>
/// Disposes any semaphores left in the queue.
/// </summary>
public void Dispose() {
lock (Q) {
while (Q.Count > 0) Q.Dequeue().Dispose();
}
}
/// <summary>
/// Checks the <see cref="IsSignaled"/> state and resets it when it's signaled.
/// </summary>
/// <returns>True if the event was in signaled state.</returns>
private bool CheckSignaled() {
lock (Q) {
if (IsSignaled) {
IsSignaled = false;
return true;
}
return false;
}
}
private readonly Queue<SemaphoreSlim> Q = new();
private volatile bool IsSignaled;
}
Я использовал SemaphoreSlim
, потому что это дает поддержку токенов тайм-аута и отмены «бесплатно». Было бы еще лучше, если бы я просто изменил оригинал .ЧИСТЫЙ исходный код SemaphoreSlim
, чтобы вести себя AutoResetEvent
так, но нет, вот и все. Дайте мне знать, если обнаружите какие-либо ошибки.
Комментарии:
1. Является
AutoResetEventAsync
ли класс потокобезопасным? Если да, то что может произойти, если два потока вызовутWaitAsync()
одновременно? Разве не возможно , что оба будут читатьIsSignaled
поле какtrue
, прежде чем кто-либо из них выполнитIsSignaled = false;
строку? Такжеif (Q.Contains(s)) Q.Dequeue().Dispose();
строка ищет,s
существует ли он в очереди, а затем снимает очередь и размещает какой-либо другой семафор (скорее всего). Это намеренно?2. @Теодорзулиас : Да, потому что даже если 2 потока могут войти
WaitAsync
одновременно, они не смогут передатьQ
доступ одновременно. Обратите внимание, чтоQ
доступ к нему возможен только с помощью одного потока. Это делает поток простым и прямым. Это также означает, что внутреннее ожидание доступно только для одного потока. Таким образом, невозможно, чтобы недействительный семафор был отключен. Несколько тестов, которые я провел для этого класса, еще не провалились, но это не доказывает, что это действительно. Я думаю, что доступ к Q с одним протектором делает это.3. Я говорю об этой строке:
if (IsSignaled) { IsSignaled = false; return; }
. Это не защищено замком.IsSignaled
Это даже неvolatile
поле. Что касаетсяif (Q.Contains(s))
, то, если вы уверены, что кодs
может быть только во главе очереди,if (Q.Peak() == s)
было бы быстрее и выразительнее в отношении намерений кода. Кстати, что произойдет, еслиcancellationToken
отменят иWaitAsync
броски?4. Вы нашли несколько интересных крайних случаев. Я постараюсь исправить их и отредактировать свой пример… БРБ.
5. Спасибо за понимание, тогда я буду использовать лучшую версию с вашим расширением. В любом случае, это того стоило в качестве учебного опыта. Ты лучший игрок.
Ответ №8:
Вот моя версия одноразового события, которого могут ожидать несколько потоков. Он внутренне полагается на BoundedChannel
.
public class AsyncOneTimeEvent<T>
{
private T Result { get; set; }
private readonly Channel<bool> _channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(1)
{
SingleReader = false,
SingleWriter = true,
FullMode = BoundedChannelFullMode.DropWrite,
});
public async Task<T> GetResult()
{
await _channel.Reader.WaitToReadAsync().ConfigureAwait(false);
return this.Resu<
}
public void SetResult(T result)
{
this.Result = resu<
_channel.Writer.Complete();
}
public void SetError(Exception ex)
{
_channel.Writer.Complete(ex);
}
}
Комментарии:
1. Использование а
Channel
в качестве замены аTaskCompletionSource
кажется разумной идеей. Но в этом также нет необходимости, и реализация, по-видимому, подвержена проблемам видимости. Я не уверен, что все потоки будут «видеть» последнее значение энергонезависимогоprivate T Result
поля во всех случаях.2. Пример: Поток A вводит
GetResult()
метод, считывает значениеResult
не по порядку, а затем приостанавливается операционной системой. Поток B входитSetResult
в метод и выходит из него. Поток A возобновляется, синхронно выполняетawait _channel.Reader.WaitToReadAsync()
строку и возвращаетTask
значение, имеющееdefault(T)
значение. Является ли этот сценарий невозможным на основе спецификации C# ECMA-334? Я понятия не имею!3. @TheodorZoulias конечно, это работает, вы можете попробовать это онлайн : dotnetfiddle.net/uyQRG1
4. Я уверен, что это работает. Я не уверен, что он гарантированно будет корректно работать на всех архитектурах процессоров. Проблемы с видимостью, как известно, трудно отлаживать. Вы можете прочитать эту статью Игоря Островского, чтобы понять, почему.