#c# #system.threading.channels #system.io.pipelines
#c# #system.threading.channels #system.io.pipelines
Вопрос:
Я изучаю возможность использования конвейеров для обработки двоичных сообщений, поступающих из сети. Двоичные сообщения, которые я буду обрабатывать, поставляются с полезной нагрузкой, и желательно сохранить полезную нагрузку в ее двоичной форме.
Идея состоит в том, чтобы прочитать все сообщение и создать фрагмент сообщения и его полезную нагрузку, после того, как сообщение будет полностью прочитано, оно будет передано в цепочку каналов для обработки, обработка не будет мгновенной и может занять некоторое время или быть выполнена позже, и цель состоит в том, чтобы не заставлять устройство чтения каналов ждать завершения обработки, затем, как только обработка сообщения будет завершена, мне нужно будет освободить обработанную область буфера для записи канала.
Теперь, конечно, я мог бы просто создать новый массив байтов и скопировать данные, поступающие из pipe writer, но это превзошло бы цель отсутствия копирования? Итак, насколько я понимаю, мне потребуется некоторая синхронизация буфера между конвейером и каналом? Я наблюдал за доступными API (AdvanceTo) для чтения каналов, где можно сообщить читателю канала, что было использовано и что было проверено, но не могу понять, как это можно синхронизировать вне метода чтения канала.
Итак, вопрос будет заключаться в том, существуют ли какие-либо методы или примеры того, как этого можно достичь.
Ответ №1:
Буфер, полученный из TryRead
/ ReadAsync
, действителен только до тех пор, пока вы не вызовете AdvanceTo
, с ожиданием, что, как только вы это сделаете: все, что вы сообщили как потребленное, доступно для повторного использования для использования в другом месте (что может быть параллельным / одновременным чтением). Строго говоря: даже те биты, о которых вы не сообщили как о потребленных: вам все равно не следует считать действительными после вызова AdvanceTo
(хотя на самом деле, вполне вероятно, что они все равно будут одними и теми же сегментами — просто: это не забота вызывающего; для вызывающего это действительно только между чтением и продвижением).
Это означает, что вы явно не можете сделать:
while (...)
{
var result = await pipe.ReadAsync();
if (TryIdentifyFrameBoundary(out var frame)) {
BeginProcessingInBackground(frame); // <==== THIS IS A PROBLEM!
reader.AdvanceTo(frame.End, frame.End);
}
else if { // take nothing
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted) break; // that's all folks
}
}
потому что бит «в фоновом режиме», когда он срабатывает, теперь может считывать чужие данные (из-за того, что он уже используется повторно).
Итак: либо вам нужно обработать содержимое фрейма как часть цикла чтения, либо вам придется сделать копию данных, скорее всего, с помощью:
c#
var len = checked ((int)buffer.Length);
var oversized = ArrayPool<byte>.Shared.Rent(len);
buffer.CopyTo(oversized);
и переходите oversized
к фоновой обработке, не забывая просматривать только первые len
байты. Вы могли передать это как ReadOnlyMemory<byte>
, но вам нужно учитывать, что впоследствии вы также захотите вернуть его в пул массивов (возможно, в finally
блоке), и передача его в виде памяти делает это немного более неудобным (но не невозможным, благодаря MemoryMarshal.TryGetArray
).
Примечание: в ранних версиях API конвейеров был элемент подсчета ссылок, который действительно позволял сохранять буферы, но у него было несколько проблем:
- это сильно усложнило API
- это привело к утечке буферов
- это было неоднозначно и сбивало с толку, что означало «сохранено»; это количество до тех пор, пока оно не будет повторно использовано? или выпущен полностью?
таким образом, эта функция была удалена.
Комментарии:
1. Понял, я бы поступил именно так, если бы не было найдено никаких решений. Один вопрос, поскольку у меня будут некоторые функции, которые будут создавать буфер неизвестного / переменного размера при вызове, было бы хорошей практикой передавать общий пул памяти в параметрах функции, чтобы функция могла выделить требуемый буфер? Вызывающая функция будет отвечать за освобождение буфера после его обработки. Это распространенный подход?
2. @NullReference честно говоря, если нет веской причины, я бы просто упростил и использовал общий пул буферов (
ArrayPool<T>.Shared
)3. Ну, да, это может быть общим, я проверю, есть ли у меня какие-либо веские причины для этого 🙂 Насколько я понимаю, выбор между ArrayPool и MemoryPool будет зависеть от базовых API, которые я буду использовать для анализа / обработки данных?
4. @NullReference на самом деле, на мой взгляд, нет; вы можете тривиально создать
[ReadOnly]Memory<T>
изT[]
, одновременно исправляя «негабаритную» вещь, иMemoryPool<T>
имеет накладные расходы на выделение дляIDisposable
. Честно говоря, в большинстве кодов очень мало причин использоватьMemoryPoolT<T>
вместо простоArrayPool<T>
. Когда это действительно становится интересным, это: когда вы хотите использовать неуправляемую память для своего пула.5. @AlexanderFarber да — ответил в контексте