#c# #async-await
#c# #async-await
Вопрос:
У меня есть приведенный ниже код, который выполняется, когда Task.Delay(500000)
, но когда мой Task.Delay(5000)
, он не дает мне никакого результата, поскольку продолжительность времени для выполнения ожидаемого результата очень мала. Я ищу способ перепроектировать код, без которого я мог бы справиться с этим Task.Delay()
, поскольку мое время отклика может варьироваться для каждого выполнения. Как мне это сделать?
Примечание: изменен код с помощью метода, предложенного в ответе Roald. Более ранний запрос к задаче.Delay() не может обрабатываться асинхронно для подачи изменений. Наоборот, вместо Push-модели будет использоваться Pull model
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor;
using System;
using System.Collections.Generic;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class ChangeFeedProcessorOptions
{
public int BufferCapacity { get; set; }
public string ProcessorName { get; set; }
public Container LeaseContainer { get; set; }
public string InstanceName { get; set; }
public DateTime StartTime { get; set; }
}
class Program
{
static async Task Main()
{
var client = new CosmosClient("AccountEndpoint = https://test.documents.azure.com:443/;AccountKey=oaEOA==;");
var database = client.GetDatabase("testDatabase");
var container = database.GetContainer("testContainer");
var options = new ChangeFeedProcessorOptions
{
BufferCapacity = 10,
InstanceName = "ChangeFeedInstanceName",
LeaseContainer = database.GetContainer("leases"),
ProcessorName = "ChangeFeedProcessorName",
StartTime = DateTime.Now.AddDays(-7).ToUniversalTime()
};
var count = 0;
await foreach (var doc in container.GetChangeFeed<document>(options))
{
Console.Write(doc, b: true);
count ;
if (count == 6)
{
break;
}
}
}
public static async IAsyncEnumerable<document> GetChangeFeed<document>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var channel = Channel.CreateBounded<document>(new BoundedChannelOptions(options.BufferCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true
});
var processor = self
.GetChangeFeedProcessorBuilder<document>(options.ProcessorName, async (items, cancellation) =>
{
foreach (var item in items)
{
await channel.Writer.WriteAsync(item, cancellation);
}
})
.WithLeaseContainer(options.LeaseContainer)
.WithInstanceName(options.InstanceName)
.WithStartTime(options.StartTime)
.Build();
await processor.StartAsync();
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
finally
{
await processor.StopAsync();
}
}
}
}
Комментарии:
1. Это
ProcessChanges
то, что занимает все время? Может быть , вы могли бы что — нибудь сделать сTaskCompletionSource
или аSempahoreSlim
?2. Строка «await cfp.startAsync();» должна блокироваться до тех пор, пока не будут получены все данные (полный ответ) и задержка не потребуется. Если ожидание было удалено, то вам понадобится задержка.
3. Изменения процесса требуют времени, это делегат, который обрабатывает изменения. не могли бы вы подробнее рассказать о TaskCompletionSource
4. Он не работает с Task. Delay() удален
5. Какой смысл использовать
Task.Run
только для блокировки ожидания результата?Task.Delay()
в любом случае это не нужно. НезависимоStartAsync
от того, что иStopAsync
делать, не должно быть никакой необходимости в задержке внутри уже заблокированного вызова. Нужно было бы догадаться, что не так, поскольку соответствующий код отсутствует
Ответ №1:
Я думаю, что наиболее гибкий способ — сначала превратить поток изменений в IAsyncEnumerable, чтобы затем вы могли просто использовать linq или какой-нибудь прямой императивный код для его обработки.
Вы можете получить IAsyncEnumerable, используя этот метод расширения
Версия SDK <= 3.15.0
public record ChangeFeedProcessorOptions
{
public int BufferCapacity { get; init; }
public string ProcessorName { get; init; }
public Container LeaseContainer { get; init; }
public string InstanceName { get; init; }
public DateTime StartTime { get; init; }
}
public static async IAsyncEnumerable<T> GetChangeFeed<T>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(options.BufferCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true
});
var processor = self
.GetChangeFeedProcessorBuilder<T>(options.ProcessorName, async (items, cancellation) =>
{
foreach (var item in items)
{
await channel.Writer.WriteAsync(item, cancellation);
}
})
.WithLeaseContainer(options.LeaseContainer)
.WithInstanceName(options.InstanceName)
.WithStartTime(options.StartTime)
.Build();
await processor.StartAsync();
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
finally
{
await processor.StopAsync();
}
}
Версия SDK> 3.15.0
public record ChangeFeedProcessorOptions
{
public DateTime StartTime { get; init; }
public TimeSpan PollInterval { get; init; }
}
public static async IAsyncEnumerable<T> GetChangeFeed<T>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var iterator = self.GetChangeFeedIterator<T>(ChangeFeedStartFrom.Time(options.StartTime));
while (iterator.HasMoreResults)
{
FeedResponse<T> items;
try
{
items = await iterator.ReadNextAsync(cancellationToken);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
{
// No changes
await Task.Delay(options.PollInterval, cancellationToken);
continue;
}
foreach (var item in items)
{
yield return item;
}
}
}
а затем используйте его следующим образом:
static async Task Main()
{
var client = new CosmosClient("AccountEndpoint = https://test.documents.azure.com:443/;AccountKey=oaEOA==;");
var database = client.GetDatabase("testDatabase");
var container = database.GetContainer("testContainer");
var options = new ChangeFeedProcessorOptions
{
BufferCapacity = 10,
InstanceName = ChangeFeedInstanceName,
LeaseContainer = database.GetContainer("leases"),
ProcessorName = ChangeFeedProcessorName,
StartTime = DateTime.Now.Subtract(MaxAge).ToUniversalTime()
};
var count = 0;
await foreach (var doc in container.GetChangeFeed<Recording>(options))
{
WriteObject(doc, b: true);
count ;
if (count == 6)
{
break;
}
}
}
или даже лучше, если вы добавите System.Linq.Async
await foreach (var doc in container.GetChangeFeed<Recording>(options).Take(6))
{
WriteObject(doc, b: true);
}
Для другой реализации вы также можете посмотреть здесь, она использует два семафора вместо канала для достижения того же результата.
Асинхронный код в командлетах powershell
Проблемы, с которыми вы сталкиваетесь, связаны с тем, что в командлете вызовы WriteObject
, WriteVerbose
, WriteWarning
, и т. Д. Должны Поступать из основного потока.
Чтобы решить эту проблему, вам нужно запустить перекачку сообщений ProcessRecord
и использовать ее для отправки обратно в основной поток, когда вам нужно вызвать любой из этих методов, именно то, что вам нужно было бы сделать в WinForm или WPF с помощью диспетчера.
Библиотека, которая заботится об этом, — это powersellasync
используя библиотеку, ваш код станет
[Cmdlet(VerbsCommon.Get, "ChangedRecording")]
[OutputType(typeof(Recording))]
public class SyncRecording : AsyncCmdlet
{
// ...
protected override async Task ProcessRecordAsync()
{
var container = ...;
await foreach (var doc in container.GetChangeFeed<Recording>(options).Take(6))
{
WriteObject(doc, b: true);
}
}
}
Комментарии:
1. привет, Роальд, после нескольких дней попыток я, наконец, перехожу к вашему подходу.. предложенный вами выше код содержит несколько ошибок, таких как общедоступная запись ChangeFeedProcessorOptions на какую сборку ссылается запись?
2. @AnkitKumar записи и свойства, доступные только для инициализации, являются функциями c # 9, вы можете прочитать о них здесь devblogs.microsoft.com/dotnet/c-9-0-on-the-record , если вы не можете использовать c # 9, просто замените ‘record’ на ‘class’ и ‘init’ на ‘set’.
3. @AnkitKumar да, я это тестировал .. вам нужно прочитать эти сообщения об ошибках! при компиляции вашего кода в нем говорится: «[CS1106] Метод расширения должен быть определен в неродовом статическом классе». GetChangeFeed является методом расширения и поэтому должен быть объявлен в статическом классе, но вы скопировали его в программу, которая не является статической. дополнительная информация в методах расширения: docs.microsoft.com/en-us/dotnet/csharp/programming-guide /…
4. осталось с одним контейнером ошибок. GetChangeFeedBuilder<document>(параметры)) , в нем говорится, что контейнер не содержит определения для GetChangeFeedBuilder . Какие пакеты вы используете
5. да, это верно, доступен GetChangeFeedProcessorBuilder, но он выдает ошибку, поскольку не получает определение для GetAsyncEnumerator . Если вы тестировали код, как вы получили GetChangedFeedBuilder??