Эксклюзивный доступ к объекту перекрестного потока в пределах двух сменных волокон потока

#c# #multithreading #thread-safety #deadlock

#c# #многопоточность #безопасность потоков #взаимоблокировка

Вопрос:

Я использую сетевую библиотеку, которая использует волокна. Волокно гарантирует, что все требуемые действия выполняются синхронизированным и упорядоченным образом:

 interface IFiber 
{    
    Enqeue(Action action)
}
  

У каждого подключенного узла есть свое волокно запроса, в пределах которого он выполняет все свои операции.

У меня также есть одно волокно уровня приложения.

Каждый узел имеет свой (постоянный) объект данных и работает с ним. Но мне также нужно получить к нему доступ из внешнего однорангового контекста и даже когда он отключен (и его волокно автоматически удалено).

Поэтому я должен каким-то образом поддерживать словарь и передавать qeued действия между волокнами, когда они заменяются на прикладное волокно (одноранговое соединение отключено) или новое волокно (одноранговое соединение подключено).

Я думаю сохранить класс исполнителя для каждой сущности. Исполнитель инициирует действия, (отменяет) RegisterFiber и выполняет действия внутри текущего волокна.

 public class Executor
{
    readonly object _lock = new object();
    readonly IFiber _applicaitonFiber;
    IFiber _currentFiber;
    Action _actions;

    public Executor(IFiber applicaitonFiber)
    {
        _currentFiber = _applicaitonFiber = applicaitonFiber;
    }

    public void SetFiber(IFiber fiber)
    {
        lock (_lock)
        {
            var fiberLocal = _currentFiber = fiber ?? _applicaitonFiber;
            if (_actions != null)
                _currentFiber.Enqueue(() => Execute(fiberLocal));
        }
    }

    public void Enqeue(Action action)
    {
        if (action == null) throw new ArgumentNullException("action");
        lock (_lock)
        {
            bool start = _actions == null;
            _actions  = action;
            var fiberLocal = _currentFiber;
            if (start)
                _currentFiber.Enqueue(() => Execute(fiberLocal));
        }
    }

    void Execute(IFiber currentFiber)
    {
        lock (_lock)
        {
            if (currentFiber != _currentFiber) return;
            var a = _actions;
            if (a == null) return;
            _actions = null;
            // I can't release lock here. What if new fiber is registered before it is executed?
            a();
        }
    }
}
  

Вопрос в том, как я могу заблокировать регистрацию нового волокна, пока выполняется действие на ранее зарегистрированном волокне.

Рассмотрим этот пример взаимоблокировки:

  1. Поток A: выполняет действия над объектом 1, переключение волокон заблокировано монитором.
  2. Поток B делает то же самое с объектом 2.
  3. О: действие 1 требует доступа / замены волокон для объекта 2. Он ожидает, пока B снимет блокировку.
  4. B: действие 2 требует того же для объекта 1. Он ожидает A.

Я думаю, что возможное решение — сделать метод SetFiber асинхронным и выполнять все операции через _applicationFiber .

 public class Executor
{
    readonly object _lock = new object();
    readonly IFiber _applicationFiber;
    IFiber _currentFiber;
    Action _actions;

    public Executor(IFiber applicaitonFiber)
    {
        _currentFiber = _applicationFiber = applicaitonFiber;
    }

    public IOperationResult<bool> SetFiber(IFiber fiber)
    {
        var r = new OperationResult<bool>();
        _applicationFiber.Enqueue(
            () =>
            {
                lock (_lock)
                {
                    var fiberLocal = _currentFiber = fiber ?? _applicationFiber;
                    if (_actions != null)
                        _currentFiber.Enqueue(() => Execute(fiberLocal));
                    r.Result = true; // async event
                }
            });
        return r;
    }

    public void Enqeue(Action action)
    {
        if (action == null) throw new ArgumentNullException("action");
        _applicationFiber.Enqueue(
            () =>
            {
                lock (_lock)
                {
                    bool start = _actions == null;
                    _actions  = action;
                    var fiberLocal = _currentFiber;
                    if (start)
                        _currentFiber.Enqueue(() => Execute(fiberLocal));
                }
            });
    }

    void Execute(IFiber currentFiber)
    {
        lock (_lock)
        {
            if (currentFiber != _currentFiber) return; // replaced
            var a = _actions;
            if (a == null) return;
            _actions = null;
            a();
        }
    }
}
  

Но я все еще не уверен в этом решении. Что, если мне нужно выполнить большой запрос к базе данных изнутри действия? Он может приостановить работу всего приложения fiber до тех пор, пока блокировка не будет снята.

Есть ли какие-либо шаблоны, которые я могу применить здесь?

Ответ №1:

Я думаю, это должно сработать:

 using System;
using System.Threading;
using ExitGames.Concurrency.Fibers;

    public class EntityFiberManager
    {
        readonly object _executionLock = new object();
        //readonly object _enqueueLock = new object();
        readonly IFiber _applicationFiber;
        IFiber _currentFiber;
        volatile Action _actions;

        public EntityFiberManager(IFiber applicaitonFiber)
        {
            _currentFiber = _applicationFiber = applicaitonFiber;
        }

        /// <summary>
        /// Removes the current set fiber if it's equal to <paramref name="fiber"/>.
        /// All queued actions will be rerouted to the application fiber.
        /// Can be called from anywhere.
        /// Disposed fiber should never be set with <see cref="AcquireForNewFiber"/> again.
        /// Doesn't block.
        /// </summary>
        public void ReleaseForDisposedFiber(IFiber fiber)
        {
            if (fiber == null) throw new ArgumentNullException("fiber");
            ReleaseForDisposedFiberInternal(fiber);
        }

        private void ReleaseForDisposedFiberInternal(IFiber fiber)
        {
            if ((_executingEntityFiberManager != null amp;amp; _executingEntityFiberManager != this) || !Monitor.TryEnter(_executionLock, 1))
            {
                _applicationFiber.Enqueue(() => ReleaseForDisposedFiberInternal(fiber));
                return;
            }
            try
            {
                //lock (_enqueueLock)
                //{
                if (_currentFiber != fiber) return;
                _currentFiber = null;
                Thread.MemoryBarrier(); // do not reorder!
                if (_actions != null)
                    _applicationFiber.Enqueue(() => Execute(null));
                //}
            }
            finally
            {
                Monitor.Exit(_executionLock);
            }
        }

        /// <summary>
        /// Sets a new fiber. 
        /// All queued actions will be rerouted to that fiber.
        /// Can be called from anywhere except from another Executor queud action.
        /// Blocks until the current execution of queued actions is not finished.
        /// </summary>
        public void AcquireForNewFiber(IFiber fiber)
        {
            if (fiber == null) throw new ArgumentNullException("fiber");
            if (_executingEntityFiberManager != null amp;amp; _executingEntityFiberManager != this) 
                throw new InvalidOperationException("Can't call this method on from queued actions on another instance");
            lock (_executionLock)
            //lock (_enqueueLock)
            {
                if (_currentFiber == fiber) return;
                var fiberLocal = _currentFiber = fiber;
                Thread.MemoryBarrier(); // do not reorder!
                if (_actions != null)
                    fiberLocal.Enqueue(() => Execute(fiberLocal));
            }
        }

        /// <summary>
        /// Enqueus an action to the current fiber.
        /// Doesn't block.
        /// </summary>
        public void Enqeue(Action action)
        {
            if (action == null) throw new ArgumentNullException("action");

            //lock (_enqueueLock)
            //{
            // we could add another lock
            // but we just need to ensure
            // that we properly detect when previous queue was empty

            // also delegate are immutable so we exchange references

            Action currentActions;
            Action newActions;
            do
            {
                Thread.Sleep(0);
                currentActions = _actions;
                newActions = currentActions   action;
            }
            while (Interlocked.CompareExchange(ref _actions, newActions, currentActions) != currentActions);

            bool start = currentActions == null;

            if (start)
            {
                // that's why we would want to use _enqueueLock
                // we don't want the current fiber to be replaced
                // imagine that when replacing queue was empty
                // than we read the fiber
                var fiber = _currentFiber;
                Thread.MemoryBarrier();
                if (fiber == null)
                    fiber = _applicationFiber;
                // and then replace writes its new fiber to memory
                // we have a wrong fiber here
                // and Execute will just quit
                // and all next Enqueue calls will do nothing
                // but now it's fixed with MemoryBarrier call. I think so.
                fiber.Enqueue(() => Execute(fiber));
            }
            //}
        }

        [ThreadStatic]
        static EntityFiberManager _executingEntityFiberManager;

        void Execute(IFiber currentFiber)
        {
            lock (_executionLock)
            {
                if (currentFiber != _currentFiber) return; // replaced
                var actions = Interlocked.Exchange(ref _actions, null);
                if (actions == null) return;
                if (_executingEntityFiberManager != null)
                    throw new InvalidOperationException("Already in execution process");
                _executingEntityFiberManager = this;
                try
                {
                    actions();
                }
                finally
                {
                    _executingEntityFiberManager = null;
                }
            }
        }
    }