#c# #asynchronous #concurrency #solace #taskcompletionsource
Вопрос:
Я прочитал ряд статей и вопросов здесь , в StackOverflow, о том, как обернуть API на основе обратного вызова с помощью API на Task
основе a TaskCompletionSource
, и я пытаюсь использовать такую технику при общении с брокером сообщений Solace PubSub .
Мое первоначальное наблюдение состояло в том, что этот метод, похоже, перекладывает ответственность за параллелизм. Например, в библиотеке брокера утешения есть Send()
метод, который может быть заблокирован, а затем мы получаем обратный вызов после завершения сетевого взаимодействия, чтобы указать «реальный» успех или неудачу. Таким образом, этот Send()
метод может быть вызван очень быстро, и библиотека поставщика ограничивает параллелизм внутри.
Когда вы ставите задачу вокруг этого, кажется, что вы либо сериализуете операции ( foreach message await SendWrapperAsync(message)
), либо сами берете на себя ответственность за параллелизм, решая, сколько задач запускать (например, используя поток данных TPL).
В любом случае, я решил завершить Send
вызов с гарантом, который будет повторять попытку вечно, пока обратный вызов не укажет на успех, а также возьмет на себя ответственность за параллелизм. Это «гарантированная» система обмена сообщениями. Неудача — это не вариант. Для этого требуется, чтобы гарант мог применить противодавление, но на самом деле это не входит в сферу данного вопроса. У меня есть пара комментариев по этому поводу в моем примере кода ниже.
Это означает, что мой горячий путь, который обертывает обратный вызов send , «очень горячий» из-за логики повторных попыток. И поэтому здесь много TaskCompletionSource
творения.
В собственной документации поставщика содержатся рекомендации по повторному использованию их Message
объектов, где это возможно, а не по их повторному созданию для каждого Send
. Я решил использовать a Channel
в качестве кольцевого буфера для этого. Но это заставило меня задуматься — есть ли какая — то альтернатива этому TaskCompletionSource
подходу-может быть, какой-то другой объект, который также можно кэшировать в кольцевом буфере и использовать повторно, добиваясь того же результата?
Я понимаю, что это, вероятно, чрезмерная попытка микрооптимизации, и, честно говоря, я изучаю несколько аспектов C#, которые выше моего уровня оплаты (на самом деле я специалист по SQL), поэтому я могу упустить что-то очевидное. Если ответ будет «на самом деле вам не нужна эта оптимизация», это не успокоит меня. Если бы ответ был «это действительно единственный разумный способ», мое любопытство было бы удовлетворено.
Вот полностью функционирующее консольное приложение, которое имитирует поведение библиотеки Solace в MockBroker
объекте, и моя попытка обернуть его. Мой горячий путь-это SendOneAsync
метод в Guarantor
классе. Код, вероятно, немного длинноват для ЭТОГО, но это самая минимальная демонстрация, которую я мог бы создать, которая охватывает все важные элементы.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
internal class Message { public bool sent; public int payload; public object correlator; }
// simulate third party library behaviour
internal class MockBroker
{
public bool TrySend(Message m, Action<Message> callback)
{
if (r.NextDouble() < 0.5) return false; // simulate chance of immediate failure / "would block" response
Task.Run(() => { Thread.Sleep(100); m.sent = r.NextDouble() < 0.5; callback(m); }); // simulate network call
return true;
}
private Random r = new();
}
// Turns MockBroker into a "guaranteed" sender with an async concurrency limit
internal class Guarantor
{
public Guarantor(int maxConcurrency)
{
_broker = new MockBroker();
// avoid message allocations in SendOneAsync
_ringBuffer = Channel.CreateBounded<Message>(maxConcurrency);
for (int i = 0; i < maxConcurrency; i ) _ringBuffer.Writer.TryWrite(new Message());
}
// real code pushing into a T.T.T.DataFlow block with bounded capacity and parallelism
// execution options both equal to maxConcurrency here, providing concurrency and backpressure
public async Task Post(int payload) => await SendOneAsync(payload);
private async Task SendOneAsync(int payload)
{
Message msg = await _ringBuffer.Reader.ReadAsync();
msg.payload = payload;
// send must eventually succeed
while (true)
{
// *** can this allocation be avoided? ***
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
msg.correlator = tcs;
// class method in real code, inlined here to make the logic more apparent
Action<Message> callback = (msg) => (msg.correlator as TaskCompletionSource<bool>).SetResult(msg.sent);
if (_broker.TrySend(msg, callback) amp;amp; await tcs.Task) break;
else
{
// simple demo retry logic
Console.WriteLine($"retrying {msg.payload}");
await Task.Delay(500);
}
}
// real code raising an event here to indicate successful delivery
await _ringBuffer.Writer.WriteAsync(msg);
Console.WriteLine(payload);
}
private Channel<Message> _ringBuffer;
private MockBroker _broker;
}
internal class Program
{
private static async Task Main(string[] args)
{
// at most 10 concurrent sends
Guarantor g = new(10);
// hacky simulation since in this demo there's nothing generating continuous events,
// no DataFlowBlock providing concurrency (it will be limited by the Channel instead),
// and nobody to notify when messages are successfully sent
List<Task> sends = new(100);
for (int i = 0; i < 100; i ) sends.Add(g.Post(i));
await Task.WhenAll(sends);
}
}
Ответ №1:
Да, вы можете избежать выделения TaskCompletionSource
экземпляров, используя облегченные ValueTask
s вместо Task
s. Сначала вам нужен многоразовый объект, который может реализовать IValueTaskSource<T>
интерфейс, и Message
он кажется идеальным кандидатом. Для реализации этого интерфейса вы можете использовать ManualResetValueTaskSourceCore<T>
структуру. Это изменяемая структура, поэтому ее не следует объявлять как readonly
. Вам просто нужно делегировать методы интерфейса соответствующим методам этой структуры с очень длинным именем:
using System.Threading.Tasks.Sources;
internal class Message : IValueTaskSource<bool>
{
public bool sent; public int payload; public object correlator;
private ManualResetValueTaskSourceCore<bool> _source; // Mutable struct, not readonly
public void Reset() => _source.Reset();
public short Version => _source.Version;
public void SetResult(bool result) => _source.SetResult(result);
ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
=> _source.GetStatus(token);
void IValueTaskSource<bool>.OnCompleted(Action<object> continuation,
object state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _source.OnCompleted(continuation, state, token, flags);
bool IValueTaskSource<bool>.GetResult(short token) => _source.GetResult(token);
}
Эти три члена GetStatus
OnCompleted
и GetResult
необходимы для реализации интерфейса. Остальные три члена ( Reset
, Version
и SetResult
) будут использоваться для создания и управления ValueTask<bool>
s.
Теперь давайте обернем TrySend
метод MockBroker
класса в асинхронный метод TrySendAsync
, который возвращает ValueTask<bool>
static class MockBrokerExtensions
{
public static ValueTask<bool> TrySendAsync(this MockBroker source, Message message)
{
message.Reset();
bool result = source.TrySend(message, m => m.SetResult(m.sent));
if (!result) message.SetResult(false);
return new ValueTask<bool>(message, message.Version);
}
}
message.Reset();
Сбрасывает значение IValueTaskSource<bool>
и объявляет , что предыдущая асинхронная операция завершена. A IValueTaskSource<T>
поддерживает только одну асинхронную операцию за раз, произведенную ValueTask<T>
операцию можно ожидать только один раз, и ее больше нельзя ожидать после следующей Reset()
. Это цена, которую вы должны заплатить за то, чтобы избежать выделения объекта: вы должны следовать более строгим правилам. Если вы попытаетесь нарушить правила (намеренно или непреднамеренно), ManualResetValueTaskSourceCore<T>
они начнут разбрасываться InvalidOperationException
повсюду.
Теперь давайте воспользуемся методом TrySendAsync
расширения:
while (true)
{
if (await _broker.TrySendAsync(msg)) break;
// simple demo retry logic
Console.WriteLine($"retrying {msg.payload}");
await Task.Delay(500);
}
Вы можете распечатать Console
GC.GetTotalAllocatedBytes(true)
до и после всей операции, чтобы увидеть разницу. Обязательно запустите приложение в режиме выпуска, чтобы увидеть реальную картину. Вы можете увидеть, что разница не такая уж впечатляющая , потому что размер TaskCompletionSource
экземпляра довольно мал по сравнению с байтами, выделяемыми Task.Delay
и всеми string
s, созданными для записи материала в Console
.
Комментарии:
1. Убиваю его, мистер Зулиас. Я отправился на поиски дополнительной информации об этом
ManualResetValueTaskCore
и прочитал предложение Стивена Тоуба, которое дало мне некоторую дополнительную основу, хотя деталей, безусловно, для меня слишком много. Если Стивен Тоуб говорит, что что-то «сложно», я возьму то, что они создали для меня, большое вам спасибо! 🙂2. @allmhuran по названию структуры можно сказать
ManualResetValueTaskSourceCore<T>
, что она не предназначалась для повседневного использования. 😃