Перехват исключений при использовании async / await и ContinueWith

#c# #exception #task-parallel-library

#c# #исключение #задача-параллельная-библиотека

Вопрос:

Я создаю общий абстрактный класс (.NET Framework 4.6.1), который будет получать элементы из рабочей очереди (в данном случае Amazon SQS, но в конечном итоге источник не имеет значения) и выполнять какую-то обработку для каждого элемента. У меня есть объявленные абстрактные методы, в которых наследники должны переопределять, чтобы выполнять свою работу. Я хочу иметь возможность выполнять эти методы параллельно, используя задачи, а затем вызывать события, чтобы указать, прошла обработка успешно или нет. Это урезанная версия того, что у меня есть. Цель состоит в том, чтобы перехватывать и необработанные исключения, ProcessReceivedItemAsync создаваемые производным классом, а затем вызывать событие.

 public class SimpleSqsItemProcessor<T> : IItemProcessor<T>
{
    public event EventHandler<SqsMessageEventArgs<T>> ProcessingCompleted;
    public event EventHandler<SqsMessageProcessingFailedEventArgs<T>> ProcessingFailed;

    // ProcessingStatus is an enum with values like "Success", "Failure", etc
    public abstract Task<ProcessingStatus> ProcessReceivedItemAsync(SqsWrapper<T> item);

    public async Task<int> ProcessNextSetAsync(CancellationToken cancellationToken)
    {
        // SqsWrapper has SQS message metadata, like Message ID, receipt handle, etc
        Queue<SqsWrapper<T>> items = await GetFromSqs();
        int count = 0;
        if (items.Count == 0)
            return count;

        List<Task> tasks = new List<Task>(items.Count);
        while (items.Count > 0)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                items.Clear();
                return count;
            }

            SqsWrapper<T> next = items.Dequeue();
            count  = 1;

            tasks.Add(ProcessReceivedItemAsync(next).ContinueWith(async t =>
            {
                try
                {
                    ProcessingStatus status = await t;

                    if (status == ProcessingStatus.Success)
                        // this method simply deletes from SQS
                        await _successHandler.HandleSuccessAsync(next);

                    // this method invokes the ProcessingCompleted event
                    OnProcessingCompleted(next);
                }
                catch (Exception ex)
                {
                    // this method invokes the ProcessingFailed event
                    OnProcessingFailed(next, ex);
                }
            }));
        }

        await Task.WhenAll(tasks);
        return count;
    }
}
  

Тогда я могу создать производный класс следующим образом:

 public class MyProcessor : SimpleSqsItemProcessor<MyObject>
{
    public override Task<ProcessingStatus> ProcessReceivedItemAsync(SqsWrapper<MyObject> item)
    {
        Random rnd = new Random();
        int i = rnd.Next(0, 1);
        if (i == 0)
            throw new Exception("OH NOES!");
        return Task.FromResult(ProcessingStatus.Success);
    }
}
  

Проблема в том, что при отладке мы никогда не вводим блок catch ProcessNextSetAsync при возникновении исключения, потому что я думал, что исключения запускаются при await выполнении задачи. Даже если я оберну await Task.WhenAll оператор в try-catch, моя точка останова в блоке catch не будет достигнута. Вместо этого всплывает исключение «О НЕТ!».

Обратите внимание, что я не могу изменить возвращаемый тип ProcessReceivedItemAsync метода на сам элемент, поскольку интерфейс, который я реализую, является старым кодом и давно установлен. Я знаю, что мог await бы вызывать эти методы по отдельности, но я хочу, чтобы все они выполнялись параллельно. Я использовал этот метод раньше, когда моя предыдущая задача не вызывала никаких исключений, но это первый раз, когда я делаю это так. Я хочу убедиться, что ProcessingFailed событие будет вызвано, поэтому я не хочу предполагать, что производные классы будут это делать

Вопросы:

  1. Где я могу перехватить исключение, создаваемое производным классом?
  2. Как я могу гарантировать, что смогу связать обрабатываемый объект с генерируемым исключением?

Заранее спасибо за любую информацию, которую вы можете предложить.

Комментарии:

1. Просто не используйте ContinueWith вообще. Это чрезвычайно сложно использовать должным образом. Практически всегда лучше использовать await для добавления продолжений к задачам. Когда вы смешиваете и сопоставляете два, это еще хуже , и очень затрудняет создание правильного кода и затрудняет понимание этого кода, даже когда он функционирует должным образом.

2. @Servy Да, я определенно читал это раньше, но ожидание каждого отдельного вызова метода — это то же самое, что запускать их все последовательно, с той лишь разницей, что основной поток не заблокирован.

3. Затем вы создаете новый метод для создания задачи, представляющей операцию, для которой вы хотите иметь Task значение. Вам не нужно пытаться уместить всю вашу программу в один метод.

4. Рассматривали ли вы возможность использования существующей библиотеки обработки, например, потока данных TPL , вместо создания собственной системы обработки с нуля?

5. @TheodorZoulias Я раньше не слышал об этой библиотеке. Я обязательно посмотрю. Спасибо!