Перепроектирование и удаление задачи.Delay() из моей функции C #

#c# #async-await

#c# #async-await

Вопрос:

У меня есть приведенный ниже код, который выполняется, когда Task.Delay(500000) , но когда мой Task.Delay(5000) , он не дает мне никакого результата, поскольку продолжительность времени для выполнения ожидаемого результата очень мала. Я ищу способ перепроектировать код, без которого я мог бы справиться с этим Task.Delay() , поскольку мое время отклика может варьироваться для каждого выполнения. Как мне это сделать?

Примечание: изменен код с помощью метода, предложенного в ответе Roald. Более ранний запрос к задаче.Delay() не может обрабатываться асинхронно для подачи изменений. Наоборот, вместо Push-модели будет использоваться Pull model

 
    using Microsoft.Azure.Cosmos;
    using Microsoft.Azure.Documents.ChangeFeedProcessor;
    using System;
    using System.Collections.Generic;
    using System.Net;
    using System.Runtime.CompilerServices;
    using System.Threading;
    using System.Threading.Channels;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        public class ChangeFeedProcessorOptions
        {
            public int BufferCapacity { get; set; }
            public string ProcessorName { get; set; }
            public Container LeaseContainer { get; set; }
            public string InstanceName { get; set; }
            public DateTime StartTime { get; set; }
        }
        
        class Program
        {
    
            
    
            static async Task Main()
            {
                var client = new CosmosClient("AccountEndpoint = https://test.documents.azure.com:443/;AccountKey=oaEOA==;");
    
                var database = client.GetDatabase("testDatabase");
                var container = database.GetContainer("testContainer");
    
                var options = new ChangeFeedProcessorOptions
                {
                    BufferCapacity = 10,
                    InstanceName = "ChangeFeedInstanceName",
                    LeaseContainer = database.GetContainer("leases"),
                    ProcessorName = "ChangeFeedProcessorName",
                    StartTime = DateTime.Now.AddDays(-7).ToUniversalTime()
                };
    
                var count = 0;
                await foreach (var doc in container.GetChangeFeed<document>(options))
                {
                    Console.Write(doc, b: true);
    
                    count  ;
    
                    if (count == 6)
                    {
                        break;
                    }
                }
            }     
    
    
            
    
        public static async IAsyncEnumerable<document> GetChangeFeed<document>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
        {
            var channel = Channel.CreateBounded<document>(new BoundedChannelOptions(options.BufferCapacity)
            {
                FullMode = BoundedChannelFullMode.Wait,
                SingleReader = true,
                SingleWriter = true
            });
    
            var processor = self
                .GetChangeFeedProcessorBuilder<document>(options.ProcessorName, async (items, cancellation) =>
                {
                    foreach (var item in items)
                    {
                        await channel.Writer.WriteAsync(item, cancellation);
                    }
                })
                .WithLeaseContainer(options.LeaseContainer)
                .WithInstanceName(options.InstanceName)
                .WithStartTime(options.StartTime)
                .Build();
    
            await processor.StartAsync();
            try
            {
                await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
                {
                    yield return item;
                }
            }
            finally
            {
                await processor.StopAsync();
            }
        }
    }
    }

 

Комментарии:

1. Это ProcessChanges то, что занимает все время? Может быть , вы могли бы что — нибудь сделать с TaskCompletionSource или а SempahoreSlim ?

2. Строка «await cfp.startAsync();» должна блокироваться до тех пор, пока не будут получены все данные (полный ответ) и задержка не потребуется. Если ожидание было удалено, то вам понадобится задержка.

3. Изменения процесса требуют времени, это делегат, который обрабатывает изменения. не могли бы вы подробнее рассказать о TaskCompletionSource

4. Он не работает с Task. Delay() удален

5. Какой смысл использовать Task.Run только для блокировки ожидания результата? Task.Delay() в любом случае это не нужно. Независимо StartAsync от того, что и StopAsync делать, не должно быть никакой необходимости в задержке внутри уже заблокированного вызова. Нужно было бы догадаться, что не так, поскольку соответствующий код отсутствует

Ответ №1:

Я думаю, что наиболее гибкий способ — сначала превратить поток изменений в IAsyncEnumerable, чтобы затем вы могли просто использовать linq или какой-нибудь прямой императивный код для его обработки.

Вы можете получить IAsyncEnumerable, используя этот метод расширения

Версия SDK <= 3.15.0

 public record ChangeFeedProcessorOptions
{
    public int BufferCapacity { get; init; }
    public string ProcessorName { get; init; }
    public Container LeaseContainer { get; init; }
    public string InstanceName { get; init; }
    public DateTime StartTime { get; init; }
}

public static async IAsyncEnumerable<T> GetChangeFeed<T>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(options.BufferCapacity)
    {
        FullMode = BoundedChannelFullMode.Wait,
        SingleReader = true,
        SingleWriter = true
    });

    var processor = self
        .GetChangeFeedProcessorBuilder<T>(options.ProcessorName, async (items, cancellation) =>
        {
            foreach (var item in items)
            {
                await channel.Writer.WriteAsync(item, cancellation);
            }
        })
        .WithLeaseContainer(options.LeaseContainer)
        .WithInstanceName(options.InstanceName)
        .WithStartTime(options.StartTime)
        .Build();

    await processor.StartAsync();
    try
    {
        await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
        {
            yield return item;
        }
    }
    finally
    {
        await processor.StopAsync();
    }
}
 

Версия SDK> 3.15.0

 public record ChangeFeedProcessorOptions
{
    public DateTime StartTime { get; init; }
    public TimeSpan PollInterval { get; init; }
}

public static async IAsyncEnumerable<T> GetChangeFeed<T>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var iterator = self.GetChangeFeedIterator<T>(ChangeFeedStartFrom.Time(options.StartTime));

    while (iterator.HasMoreResults)
    {
        FeedResponse<T> items;

        try
        {
            items = await iterator.ReadNextAsync(cancellationToken);
        }
        catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
        {
            // No changes
            await Task.Delay(options.PollInterval, cancellationToken);
            continue;
        }

        foreach (var item in items)
        {
            yield return item;
        }
    }
}
 

а затем используйте его следующим образом:

 static async Task Main()
{
    var client = new CosmosClient("AccountEndpoint = https://test.documents.azure.com:443/;AccountKey=oaEOA==;");

    var database = client.GetDatabase("testDatabase");
    var container = database.GetContainer("testContainer");

    var options = new ChangeFeedProcessorOptions
    {
        BufferCapacity = 10,
        InstanceName = ChangeFeedInstanceName,
        LeaseContainer = database.GetContainer("leases"),
        ProcessorName = ChangeFeedProcessorName,
        StartTime = DateTime.Now.Subtract(MaxAge).ToUniversalTime()
    };
    
    var count = 0;
    await foreach (var doc in container.GetChangeFeed<Recording>(options))
    {
        WriteObject(doc, b: true);
        
        count  ;
        
        if (count == 6)
        {
            break;
        }
    }
}
 

или даже лучше, если вы добавите System.Linq.Async

 await foreach (var doc in container.GetChangeFeed<Recording>(options).Take(6))
{
    WriteObject(doc, b: true);
}
 

Для другой реализации вы также можете посмотреть здесь, она использует два семафора вместо канала для достижения того же результата.

Асинхронный код в командлетах powershell

Проблемы, с которыми вы сталкиваетесь, связаны с тем, что в командлете вызовы WriteObject , WriteVerbose , WriteWarning , и т. Д. Должны Поступать из основного потока.
Чтобы решить эту проблему, вам нужно запустить перекачку сообщений ProcessRecord и использовать ее для отправки обратно в основной поток, когда вам нужно вызвать любой из этих методов, именно то, что вам нужно было бы сделать в WinForm или WPF с помощью диспетчера.
Библиотека, которая заботится об этом, — это powersellasync

используя библиотеку, ваш код станет

 [Cmdlet(VerbsCommon.Get, "ChangedRecording")]
[OutputType(typeof(Recording))]
public class SyncRecording : AsyncCmdlet
{
    // ...
    
    protected override async Task ProcessRecordAsync()
    {
        var container = ...;
        await foreach (var doc in container.GetChangeFeed<Recording>(options).Take(6))
        {
            WriteObject(doc, b: true);
        }
    }
}
 

Комментарии:

1. привет, Роальд, после нескольких дней попыток я, наконец, перехожу к вашему подходу.. предложенный вами выше код содержит несколько ошибок, таких как общедоступная запись ChangeFeedProcessorOptions на какую сборку ссылается запись?

2. @AnkitKumar записи и свойства, доступные только для инициализации, являются функциями c # 9, вы можете прочитать о них здесь devblogs.microsoft.com/dotnet/c-9-0-on-the-record , если вы не можете использовать c # 9, просто замените ‘record’ на ‘class’ и ‘init’ на ‘set’.

3. @AnkitKumar да, я это тестировал .. вам нужно прочитать эти сообщения об ошибках! при компиляции вашего кода в нем говорится: «[CS1106] Метод расширения должен быть определен в неродовом статическом классе». GetChangeFeed является методом расширения и поэтому должен быть объявлен в статическом классе, но вы скопировали его в программу, которая не является статической. дополнительная информация в методах расширения: docs.microsoft.com/en-us/dotnet/csharp/programming-guide /…

4. осталось с одним контейнером ошибок. GetChangeFeedBuilder<document>(параметры)) , в нем говорится, что контейнер не содержит определения для GetChangeFeedBuilder . Какие пакеты вы используете

5. да, это верно, доступен GetChangeFeedProcessorBuilder, но он выдает ошибку, поскольку не получает определение для GetAsyncEnumerator . Если вы тестировали код, как вы получили GetChangedFeedBuilder??