#c# #multithreading #thread-safety #deadlock
#c# #многопоточность #безопасность потоков #взаимоблокировка
Вопрос:
Я использую сетевую библиотеку, которая использует волокна. Волокно гарантирует, что все требуемые действия выполняются синхронизированным и упорядоченным образом:
interface IFiber
{
Enqeue(Action action)
}
У каждого подключенного узла есть свое волокно запроса, в пределах которого он выполняет все свои операции.
У меня также есть одно волокно уровня приложения.
Каждый узел имеет свой (постоянный) объект данных и работает с ним. Но мне также нужно получить к нему доступ из внешнего однорангового контекста и даже когда он отключен (и его волокно автоматически удалено).
Поэтому я должен каким-то образом поддерживать словарь и передавать qeued действия между волокнами, когда они заменяются на прикладное волокно (одноранговое соединение отключено) или новое волокно (одноранговое соединение подключено).
Я думаю сохранить класс исполнителя для каждой сущности. Исполнитель инициирует действия, (отменяет) RegisterFiber и выполняет действия внутри текущего волокна.
public class Executor
{
readonly object _lock = new object();
readonly IFiber _applicaitonFiber;
IFiber _currentFiber;
Action _actions;
public Executor(IFiber applicaitonFiber)
{
_currentFiber = _applicaitonFiber = applicaitonFiber;
}
public void SetFiber(IFiber fiber)
{
lock (_lock)
{
var fiberLocal = _currentFiber = fiber ?? _applicaitonFiber;
if (_actions != null)
_currentFiber.Enqueue(() => Execute(fiberLocal));
}
}
public void Enqeue(Action action)
{
if (action == null) throw new ArgumentNullException("action");
lock (_lock)
{
bool start = _actions == null;
_actions = action;
var fiberLocal = _currentFiber;
if (start)
_currentFiber.Enqueue(() => Execute(fiberLocal));
}
}
void Execute(IFiber currentFiber)
{
lock (_lock)
{
if (currentFiber != _currentFiber) return;
var a = _actions;
if (a == null) return;
_actions = null;
// I can't release lock here. What if new fiber is registered before it is executed?
a();
}
}
}
Вопрос в том, как я могу заблокировать регистрацию нового волокна, пока выполняется действие на ранее зарегистрированном волокне.
Рассмотрим этот пример взаимоблокировки:
- Поток A: выполняет действия над объектом 1, переключение волокон заблокировано монитором.
- Поток B делает то же самое с объектом 2.
- О: действие 1 требует доступа / замены волокон для объекта 2. Он ожидает, пока B снимет блокировку.
- B: действие 2 требует того же для объекта 1. Он ожидает A.
Я думаю, что возможное решение — сделать метод SetFiber асинхронным и выполнять все операции через _applicationFiber .
public class Executor
{
readonly object _lock = new object();
readonly IFiber _applicationFiber;
IFiber _currentFiber;
Action _actions;
public Executor(IFiber applicaitonFiber)
{
_currentFiber = _applicationFiber = applicaitonFiber;
}
public IOperationResult<bool> SetFiber(IFiber fiber)
{
var r = new OperationResult<bool>();
_applicationFiber.Enqueue(
() =>
{
lock (_lock)
{
var fiberLocal = _currentFiber = fiber ?? _applicationFiber;
if (_actions != null)
_currentFiber.Enqueue(() => Execute(fiberLocal));
r.Result = true; // async event
}
});
return r;
}
public void Enqeue(Action action)
{
if (action == null) throw new ArgumentNullException("action");
_applicationFiber.Enqueue(
() =>
{
lock (_lock)
{
bool start = _actions == null;
_actions = action;
var fiberLocal = _currentFiber;
if (start)
_currentFiber.Enqueue(() => Execute(fiberLocal));
}
});
}
void Execute(IFiber currentFiber)
{
lock (_lock)
{
if (currentFiber != _currentFiber) return; // replaced
var a = _actions;
if (a == null) return;
_actions = null;
a();
}
}
}
Но я все еще не уверен в этом решении. Что, если мне нужно выполнить большой запрос к базе данных изнутри действия? Он может приостановить работу всего приложения fiber до тех пор, пока блокировка не будет снята.
Есть ли какие-либо шаблоны, которые я могу применить здесь?
Ответ №1:
Я думаю, это должно сработать:
using System;
using System.Threading;
using ExitGames.Concurrency.Fibers;
public class EntityFiberManager
{
readonly object _executionLock = new object();
//readonly object _enqueueLock = new object();
readonly IFiber _applicationFiber;
IFiber _currentFiber;
volatile Action _actions;
public EntityFiberManager(IFiber applicaitonFiber)
{
_currentFiber = _applicationFiber = applicaitonFiber;
}
/// <summary>
/// Removes the current set fiber if it's equal to <paramref name="fiber"/>.
/// All queued actions will be rerouted to the application fiber.
/// Can be called from anywhere.
/// Disposed fiber should never be set with <see cref="AcquireForNewFiber"/> again.
/// Doesn't block.
/// </summary>
public void ReleaseForDisposedFiber(IFiber fiber)
{
if (fiber == null) throw new ArgumentNullException("fiber");
ReleaseForDisposedFiberInternal(fiber);
}
private void ReleaseForDisposedFiberInternal(IFiber fiber)
{
if ((_executingEntityFiberManager != null amp;amp; _executingEntityFiberManager != this) || !Monitor.TryEnter(_executionLock, 1))
{
_applicationFiber.Enqueue(() => ReleaseForDisposedFiberInternal(fiber));
return;
}
try
{
//lock (_enqueueLock)
//{
if (_currentFiber != fiber) return;
_currentFiber = null;
Thread.MemoryBarrier(); // do not reorder!
if (_actions != null)
_applicationFiber.Enqueue(() => Execute(null));
//}
}
finally
{
Monitor.Exit(_executionLock);
}
}
/// <summary>
/// Sets a new fiber.
/// All queued actions will be rerouted to that fiber.
/// Can be called from anywhere except from another Executor queud action.
/// Blocks until the current execution of queued actions is not finished.
/// </summary>
public void AcquireForNewFiber(IFiber fiber)
{
if (fiber == null) throw new ArgumentNullException("fiber");
if (_executingEntityFiberManager != null amp;amp; _executingEntityFiberManager != this)
throw new InvalidOperationException("Can't call this method on from queued actions on another instance");
lock (_executionLock)
//lock (_enqueueLock)
{
if (_currentFiber == fiber) return;
var fiberLocal = _currentFiber = fiber;
Thread.MemoryBarrier(); // do not reorder!
if (_actions != null)
fiberLocal.Enqueue(() => Execute(fiberLocal));
}
}
/// <summary>
/// Enqueus an action to the current fiber.
/// Doesn't block.
/// </summary>
public void Enqeue(Action action)
{
if (action == null) throw new ArgumentNullException("action");
//lock (_enqueueLock)
//{
// we could add another lock
// but we just need to ensure
// that we properly detect when previous queue was empty
// also delegate are immutable so we exchange references
Action currentActions;
Action newActions;
do
{
Thread.Sleep(0);
currentActions = _actions;
newActions = currentActions action;
}
while (Interlocked.CompareExchange(ref _actions, newActions, currentActions) != currentActions);
bool start = currentActions == null;
if (start)
{
// that's why we would want to use _enqueueLock
// we don't want the current fiber to be replaced
// imagine that when replacing queue was empty
// than we read the fiber
var fiber = _currentFiber;
Thread.MemoryBarrier();
if (fiber == null)
fiber = _applicationFiber;
// and then replace writes its new fiber to memory
// we have a wrong fiber here
// and Execute will just quit
// and all next Enqueue calls will do nothing
// but now it's fixed with MemoryBarrier call. I think so.
fiber.Enqueue(() => Execute(fiber));
}
//}
}
[ThreadStatic]
static EntityFiberManager _executingEntityFiberManager;
void Execute(IFiber currentFiber)
{
lock (_executionLock)
{
if (currentFiber != _currentFiber) return; // replaced
var actions = Interlocked.Exchange(ref _actions, null);
if (actions == null) return;
if (_executingEntityFiberManager != null)
throw new InvalidOperationException("Already in execution process");
_executingEntityFiberManager = this;
try
{
actions();
}
finally
{
_executingEntityFiberManager = null;
}
}
}
}