#c# #async-await #queue #task
Вопрос:
У меня есть класс, который содержит очередь запросов, которые будут собраны и отправлены в веб-API по вызову HTTP через интервал времени не более 1 секунды:
public class AsyncValueTimerIntervalWriter
{
private class ValueRequest
{
public string FullName { get; set; }
public object Value { get; set; }
}
private readonly IValuesClient _valuesClient; // auto generated Swagger HTTP client
private List<ValueRequest> _valueRequests = new List<ValueRequest>();
private object _valuesLock = new object();
private Timer _timer;
public AsyncValueTimerIntervalWriter(IValuesClient valuesClient)
{
_valuesClient = valuesClient;
}
public void Start()
{
_timer = new Timer(o => WriteValuesToServer(), null, 0, 1000);
}
public void Stop()
{
_timer?.Dispose();
_timer = null;
}
public void AddWrite(string fullName, object value)
{
lock (_valuesLock)
{
_valueRequests.Add(new ValueRequest { FullName = fullName, Value = value });
}
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
}
}
}
Пример вызывающего абонента:
var asyncWriter = new AsyncValueTimerIntervalWriter(...);
asyncWriter.AddWrite("My.Var.Tree.VarName", 1234);
asyncWriter.AddWrite("My.Var.Tree.AnotherVar", "Test");
// after max 1 sec the values are written to server
Моя цель состоит в том, чтобы написать асинхронный метод, который также добавляет значение для записи и возвращает его при записи:
await asyncWriter.WriteAsync("My.Var.Tree.VarName", 1234);
// should continue after written to server
Важно: мне нужно обрабатывать запросы в очереди, потому что модуль записи может быть остановлен в любое время, и ему не разрешается удалять запросы. После повторного запуска записи добавленные запросы должны быть отправлены на сервер.
Я пытался использовать ManualResetEvent, но это кажется странным:
...
public Task WriteAsync(string fullName, object value)
{
var resetEvent = new ManualResetEvent(false);
lock (_valuesLock)
{
_valueRequests.Add(
new ValueRequest
{
FullName = fullName,
Value = value,
CompletedEvent = resetEvent
});
}
resetEvent.WaitOne();
return Task.CompletedTask;
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
value.CompletedEvent?.Set();
}
}
...
Есть какие-нибудь предложения?
Ответ №1:
Вы можете использовать TaskCompletionSource
класс ValueEntry для передачи сигнала от записывающего устройства вызывающему.
private class ValueEntry
{
public string FullName { get; set; }
public object Value { get; set; }
protected readonly TaskCompletionSource _tcs = new TaskCompleteionSource();
public Task AwaitCompletion()
{
return _tcs.Task;
}
public Task MarkComplete()
{
return _tcs.SetResult();
}
}
Небольшое изменение в WriteValuesToServer:
public async Task WriteValuesToServer()
{
// snip
if (values.Any())
{
await _emsClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
await value.MarkComplete();
}
}
Теперь ваш писатель очень прост:
public Task WriteAsync(string fullName, object value)
{
var request = new ValueRequest { FullName = fullName, Value = value };
lock (_valuesLock)
{
_valueRequests.Add(request)
};
await request.AwaitCompletion();
}
Кроме того, я предлагаю вам изучить возможность использования коллекции блоков, которая предназначена для обработки очереди производителей/потребителей и позволит вам избавиться от большинства ваших lock
блоков.