Рефакторинг словаря в ConcurrentDictionary

#c# #.net #multithreading #concurrency #concurrent-programming

#c# #.net #многопоточность #параллелизм #concurrentdictionary

Вопрос:

Я хочу сделать свой код многопоточным, для этого мне нужно изменить словарь на ConcurrentDictionary . Я читал о ConcurrentDictionary , проверил некоторый пример, но все же мне нужна помощь в этом:

Вот исходный код (для одного потока)

 private IDictionary<string, IDictionary<string, Task>> _tasks;

public override IDictionary<string, IDictionary<string, Task>> Tasks
{
    get
    {
        // return dictionary from cache unless too old
        // concurrency!! (null check)
        if (_tasks != null amp;amp; (DateTime.Now - _lastTaskListRefreshDateTime < TimeSpan.FromSeconds(30)))
        {
            return _tasks;
        }

        // reload dictionary from database
        _tasks = new Dictionary<string, IDictionary<string, Task>>();

        // find returns an IEnumerable<Task>
        var tasks = Find<Task>(null, DependencyNode.TaskForCrawler).Cast<Task>();

        // build hierarchical dictionary from flat IEnumerable
        // concurrency!!
        foreach (var t in tasks)
        {

            if (_tasks.ContainsKey(t.Area.Key))
            {
                if (_tasks[t.Area.Key] == null)
                {
                    _tasks[t.Area.Key] = new Dictionary<string, Task>();
                }

                if (!_tasks[t.Area.Key].ContainsKey(t.Key))
                {
                    _tasks[t.Area.Key].Add(t.Key, t);
                }
            }
            else
            {
                _tasks.Add(t.Area.Key, new Dictionary<string, Task> { { t.Key, t } });
            }
        }

        _lastTaskListRefreshDateTime = DateTime.Now;
        return _tasks;
    }

    set
    {
        _tasks = value;
    }
}
  

Вот что я придумал:

 private ConcurrentDictionary<string, ConcurrentDictionary<string, Task>> _tasks = new ConcurrentDictionary<string, ConcurrentDictionary<string, Task>>();

public override ConcurrentDictionary<string, ConcurrentDictionary<string, Task>> Tasks
{
    get
    {
        // use cache
        // concurrency?? (null check)
        if (!_tasks.IsEmpty amp;amp; (DateTime.Now - _lastTaskListRefreshDateTime < TimeSpan.FromSeconds(30)))
        {
            return _tasks;
        }

        // reload
        var tasks = Find<Task>(null, DependencyNode.TaskForCrawler).Cast<Task>();

        foreach (var task in tasks)
        {
            var t = task; // inner scope for clousure
            var taskKey = t.Key;
            var areaKey = t.Area.Key;

            var newDict = new ConcurrentDictionary<string, Task>();
            newDict.TryAdd(taskKey, t);

            _tasks.AddOrUpdate(areaKey, newDict, (k, v) => {
                                                    // An dictionary element if key=areaKey already exists
                                                    // extend and return it.
                                                    v.TryAdd(taskKey, t);
                                                    return v;
                                                   });
        }

        _lastTaskListRefreshDateTime = DateTime.Now;
        return _tasks;
    }
}
  

Я не уверен, что это именно то, в частности, я совершенно уверен, что IsEmpty проверка не является потокобезопасной, поскольку _tasks возможно, была инициализирована между IsEmpty проверкой и amp;amp; ... частью или return _tasks part. Должен ли я блокировать эту проверку вручную? Нужна ли мне двойная блокировка (проверка на ноль> блокировка> проверка на ноль)?

Комментарии:

1. Безопасно ли возвращать частично заполненный _tasks ? Если нет, вам придется заблокировать все это, пока вы заполняете его в своем цикле.

Ответ №1:

Ваше беспокойство оправдано. Tasks Средство получения свойств не является потокобезопасным. Здесь есть несколько проблем.

Во-первых, как и на вашей стороне, существует гонка между вызовом IsEmpty из одного потока и удалением элемента из другого потока. Получатель может вернуть пустой словарь.

Во-вторых, существует разница между чтением _lastTaskListRefreshDateTime в if проверке и присвоением в конце getter. Даже если эти операции являются атомарными (чего они не могут быть, по крайней мере, на 32-разрядных платформах, поскольку DateTime они 64-разрядные), все еще существует небольшая проблема с барьером памяти, поскольку в коде не видно механизмов синхронизации, подобных volatile .

В-третьих, подобно моему объяснению выше, существует еще одна проблема с барьером памяти, связанная со _tasks ссылкой. Один поток может вызывать установщик, в то время как другой вызывает получатель. Поскольку барьер памяти отсутствует, среда CLR или аппаратное обеспечение могут свободно оптимизировать операции чтения и записи таким образом, чтобы изменения, внесенные в установщик, не были видны получателю. Эта проблема не обязательно может вызвать какие-либо проблемы, но я уверен, что это поведение, которое не было ожидаемо. Не имея другого контекста для анализа, я не могу сказать ни того, ни другого.

Комментарии:

1. Понятно. Поэтому лучше всего было бы заблокировать весь Task получатель. Блокировка только цикла foreach (как предложено Гейбом) не решила бы проблему с _lastTaskListRefreshDateTime … верно?

2. мое решение вытекает из предложения Гейба заблокировать цикл заполнения и идей Брайана, особенно гонки на _lastTaskListRefreshDateTime , поэтому то, что я придумал, — это блокировка всего средства получения.

Ответ №2:

ConcurrentDictionary Только гарантирует, что чтение и запись в словарь не будут перекрывать друг друга, чего Dictionary класс не делает. Потокобезопасность в ConcurrentDictionary не делает ваш код потокобезопасным, она только гарантирует, что его код потокобезопасен. Поскольку это так, вам понадобится блокировка в вашем получателе.