Сохранение буфера конвейеров до завершения обработки

#c# #system.threading.channels #system.io.pipelines

#c# #system.threading.channels #system.io.pipelines

Вопрос:

Я изучаю возможность использования конвейеров для обработки двоичных сообщений, поступающих из сети. Двоичные сообщения, которые я буду обрабатывать, поставляются с полезной нагрузкой, и желательно сохранить полезную нагрузку в ее двоичной форме.

Идея состоит в том, чтобы прочитать все сообщение и создать фрагмент сообщения и его полезную нагрузку, после того, как сообщение будет полностью прочитано, оно будет передано в цепочку каналов для обработки, обработка не будет мгновенной и может занять некоторое время или быть выполнена позже, и цель состоит в том, чтобы не заставлять устройство чтения каналов ждать завершения обработки, затем, как только обработка сообщения будет завершена, мне нужно будет освободить обработанную область буфера для записи канала.

Теперь, конечно, я мог бы просто создать новый массив байтов и скопировать данные, поступающие из pipe writer, но это превзошло бы цель отсутствия копирования? Итак, насколько я понимаю, мне потребуется некоторая синхронизация буфера между конвейером и каналом? Я наблюдал за доступными API (AdvanceTo) для чтения каналов, где можно сообщить читателю канала, что было использовано и что было проверено, но не могу понять, как это можно синхронизировать вне метода чтения канала.

Итак, вопрос будет заключаться в том, существуют ли какие-либо методы или примеры того, как этого можно достичь.

Ответ №1:

Буфер, полученный из TryRead / ReadAsync , действителен только до тех пор, пока вы не вызовете AdvanceTo , с ожиданием, что, как только вы это сделаете: все, что вы сообщили как потребленное, доступно для повторного использования для использования в другом месте (что может быть параллельным / одновременным чтением). Строго говоря: даже те биты, о которых вы не сообщили как о потребленных: вам все равно не следует считать действительными после вызова AdvanceTo (хотя на самом деле, вполне вероятно, что они все равно будут одними и теми же сегментами — просто: это не забота вызывающего; для вызывающего это действительно только между чтением и продвижением).

Это означает, что вы явно не можете сделать:

 while (...)
{
    var result = await pipe.ReadAsync();
    if (TryIdentifyFrameBoundary(out var frame)) {
        BeginProcessingInBackground(frame); // <==== THIS IS A PROBLEM!
        reader.AdvanceTo(frame.End, frame.End);
    }
    else if { // take nothing
        reader.AdvanceTo(buffer.Start, buffer.End);
        if (result.IsCompleted) break; // that's all folks
    }
}
  

потому что бит «в фоновом режиме», когда он срабатывает, теперь может считывать чужие данные (из-за того, что он уже используется повторно).

Итак: либо вам нужно обработать содержимое фрейма как часть цикла чтения, либо вам придется сделать копию данных, скорее всего, с помощью:

 c#
var len = checked ((int)buffer.Length);
var oversized = ArrayPool<byte>.Shared.Rent(len);
buffer.CopyTo(oversized);
  

и переходите oversized к фоновой обработке, не забывая просматривать только первые len байты. Вы могли передать это как ReadOnlyMemory<byte> , но вам нужно учитывать, что впоследствии вы также захотите вернуть его в пул массивов (возможно, в finally блоке), и передача его в виде памяти делает это немного более неудобным (но не невозможным, благодаря MemoryMarshal.TryGetArray ).


Примечание: в ранних версиях API конвейеров был элемент подсчета ссылок, который действительно позволял сохранять буферы, но у него было несколько проблем:

  • это сильно усложнило API
  • это привело к утечке буферов
  • это было неоднозначно и сбивало с толку, что означало «сохранено»; это количество до тех пор, пока оно не будет повторно использовано? или выпущен полностью?

таким образом, эта функция была удалена.

Комментарии:

1. Понял, я бы поступил именно так, если бы не было найдено никаких решений. Один вопрос, поскольку у меня будут некоторые функции, которые будут создавать буфер неизвестного / переменного размера при вызове, было бы хорошей практикой передавать общий пул памяти в параметрах функции, чтобы функция могла выделить требуемый буфер? Вызывающая функция будет отвечать за освобождение буфера после его обработки. Это распространенный подход?

2. @NullReference честно говоря, если нет веской причины, я бы просто упростил и использовал общий пул буферов ( ArrayPool<T>.Shared )

3. Ну, да, это может быть общим, я проверю, есть ли у меня какие-либо веские причины для этого 🙂 Насколько я понимаю, выбор между ArrayPool и MemoryPool будет зависеть от базовых API, которые я буду использовать для анализа / обработки данных?

4. @NullReference на самом деле, на мой взгляд, нет; вы можете тривиально создать [ReadOnly]Memory<T> из T[] , одновременно исправляя «негабаритную» вещь, и MemoryPool<T> имеет накладные расходы на выделение для IDisposable . Честно говоря, в большинстве кодов очень мало причин использовать MemoryPoolT<T> вместо просто ArrayPool<T> . Когда это действительно становится интересным, это: когда вы хотите использовать неуправляемую память для своего пула.

5. @AlexanderFarber да — ответил в контексте