Создайте асинхронный метод для HTTP-вызова на основе таймера в очереди

#c# #async-await #queue #task

Вопрос:

У меня есть класс, который содержит очередь запросов, которые будут собраны и отправлены в веб-API по вызову HTTP через интервал времени не более 1 секунды:

     public class AsyncValueTimerIntervalWriter
    {
        private class ValueRequest
        {
            public string FullName { get; set; }
            public object Value { get; set; }
        }

        private readonly IValuesClient _valuesClient; // auto generated Swagger HTTP client

        private List<ValueRequest> _valueRequests = new List<ValueRequest>();
        private object _valuesLock = new object();

        private Timer _timer;

        public AsyncValueTimerIntervalWriter(IValuesClient valuesClient)
        {
            _valuesClient = valuesClient;
        }

        public void Start() 
        {
            _timer = new Timer(o => WriteValuesToServer(), null, 0, 1000);
        }

        public void Stop() 
        {
            _timer?.Dispose();
            _timer = null;
        }

        public void AddWrite(string fullName, object value)
        {
            lock (_valuesLock)
            {
                _valueRequests.Add(new ValueRequest { FullName = fullName, Value = value });
            }
        }

        private async void WriteValuesToServer()
        {
            IList<ValueRequest> values;

            lock (_valuesLock)
            {
                values = _valueRequests.ToArray();
                _valueRequests.Clear();
            }

            if (values.Any())
            {
                await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
            }
        }
    }
 

Пример вызывающего абонента:

 var asyncWriter = new AsyncValueTimerIntervalWriter(...);
asyncWriter.AddWrite("My.Var.Tree.VarName", 1234);
asyncWriter.AddWrite("My.Var.Tree.AnotherVar", "Test");

// after max 1 sec the values are written to server
 

Моя цель состоит в том, чтобы написать асинхронный метод, который также добавляет значение для записи и возвращает его при записи:

 await asyncWriter.WriteAsync("My.Var.Tree.VarName", 1234);
// should continue after written to server
 

Важно: мне нужно обрабатывать запросы в очереди, потому что модуль записи может быть остановлен в любое время, и ему не разрешается удалять запросы. После повторного запуска записи добавленные запросы должны быть отправлены на сервер.

Я пытался использовать ManualResetEvent, но это кажется странным:

 ...
public Task WriteAsync(string fullName, object value)
{
    var resetEvent = new ManualResetEvent(false);

    lock (_valuesLock)
    {
        _valueRequests.Add(
            new ValueRequest 
            { 
                FullName = fullName, 
                Value = value, 
                CompletedEvent = resetEvent 
            });
    }

    resetEvent.WaitOne();

    return Task.CompletedTask;
}

private async void WriteValuesToServer()
{
    IList<ValueRequest> values;

    lock (_valuesLock)
    {
        values = _valueRequests.ToArray();
        _valueRequests.Clear();
    }

    if (values.Any())
    {
        await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request

        foreach (var value as values)
            value.CompletedEvent?.Set();
    }
}
...
 

Есть какие-нибудь предложения?

Ответ №1:

Вы можете использовать TaskCompletionSource класс ValueEntry для передачи сигнала от записывающего устройства вызывающему.

 private class ValueEntry
{
    public string FullName { get; set; }
    public object Value { get; set; }

    protected readonly TaskCompletionSource _tcs = new TaskCompleteionSource();

    public Task AwaitCompletion()
    {
        return _tcs.Task;
    }

    public Task MarkComplete()
    {
        return _tcs.SetResult();
    }
}
 

Небольшое изменение в WriteValuesToServer:

 public async Task WriteValuesToServer()
{
    // snip 
    if (values.Any())
    {
        await _emsClient.SetValuesAsync(values); // Sends HTTP POST request

        foreach (var value as values)
            await value.MarkComplete();
    }
}
 

Теперь ваш писатель очень прост:

 public Task WriteAsync(string fullName, object value)
{
    var request = new ValueRequest { FullName = fullName, Value = value };

    lock (_valuesLock)
    {
        _valueRequests.Add(request)
    };
    await request.AwaitCompletion();
}
 

Кроме того, я предлагаю вам изучить возможность использования коллекции блоков, которая предназначена для обработки очереди производителей/потребителей и позволит вам избавиться от большинства ваших lock блоков.