Как вызвать метод GetOrAdd в .NET ConcurrentDictionary потокобезопасным способом?

#c# #.net #multithreading #collections #thread-safety

#c# #.net #многопоточность #Коллекции #потокобезопасность

Вопрос:

Я пытаюсь создать коллекцию, из keys которой .. если ключ не существует, тогда мне нужно DoSomeMethod() , а затем добавить ключ в коллекцию.

Проблема в том, что это должно иметь возможность обрабатывать несколько потоков, пытающихся добавить один и тот же ключ одновременно.

Если два потока имеют один и тот же ключ, то будет работать только один DoSomeMethod() , в то время как другой должен ждать.

Я рассматривал возможность использования ConcurrentDictionary метода and GetOrAdd (с параметром Func(..) param), но, похоже, оба они «запускаются» одновременно, если два потока имеют один и тот же ключ. Я думал, что реализация GetOrAdd будет

  • блокировка ‘key’
  • получить value из key .
  • если нет value , то делайте что угодно .. и теперь установите значение.
  • возврат value .
    … и любые другие key обращения будут ждать, пока не будет выполнена блокировка.

Похоже, что мой пользовательский метод, который GetOrAdd вызывает метод, не является потокобезопасным.

Документы MSDN также предполагают это?

Примечания
Если вы вызываете GetOrAdd одновременно в разных потоках, addValueFactory он может вызываться несколько раз, но его пара ключ / значение может не добавляться в словарь для каждого вызова


Пример покаяния: я копирую файлы из source в destination .

  • При копировании source файла проверьте коллекцию, если мы пытались проверить и создать папку назначения.
  • Если в коллекции нет key , проверьте, не существуют ли папки назначения… и создайте его, если он не существует.
  • После создания целевой папки сохраните имя / путь к этой папке в коллекции.
  • Повторите для всех файлов.

Таким образом, мы создаем папки назначения только в том случае, если мы этого еще не сделали.

Это похоже на то, что я хочу заблокировать КЛЮЧ коллекции….

Комментарии:

1. Вы должны объединить «check» и «do» в одной блокировке. Если один поток проверяет ключ и решает что-то сделать, второй поток может прервать «do» и попытаться сделать то же самое. Правильно ли я понимаю проблему или есть что-то еще?

2. Нет — 2-й поток не может прерываться, но должен ждать.

3. Я все еще не понимаю, в чем проблема. Почему параллельный словарь у вас не работает?

4. Да, именно так работает GetOrAdd. Это не гарантирует, что фабрика значений не будет вызвана дважды, только то, что вы гарантированно получите одно и то же значение обратно в обоих потоках. Проблема с предлагаемой вами реализацией заключается в том, что для нее требуется блокировка на довольно высоком уровне (каждый раз, когда проверяется ключ), и для этого требуется удерживать блокировку при вызове внешнего кода. Параллельные коллекции имеют «низкую блокировку», чтобы максимизировать пропускную способность в сценариях с высокой конкуренцией. Вызов внешнего кода при удержании блокировки плох, потому что он создает путь для мертвых блокировок.

5. Можете ли вы использовать SemaphoreSlim необязательно с таймаутом? Вы также можете проверить count на нем.

Ответ №1:

 public class OnceOnlyConcurrent<TKey, TValue>
{
    private readonly ConcurrentDictionary<TKey, Lazy<TValue>> _dictionary = new ConcurrentDictionary<TKey, Lazy<TValue>>();

    public TValue GetOrAdd(TKey key, Func<TValue> computation)
    {
        var result = _dictionary.AddOrUpdate(key, _ => new Lazy<TValue>(computation, LazyThreadSafetyMode.ExecutionAndPublication), (_, v) => v);
        return result.Value;
    }
}
  

Думаю, я должен немного описать это. По сути, здесь происходит то, что, хотя AddOrUpdate addValueFactory делегат всегда вызывается дважды, если AddOrUpdate одновременно встречаются два вызывающих абонента, оба этих вызова на самом деле ничего не делают, кроме как возвращают Lazy<T> ссылку, которая завершает вычисление.

Внутри AddOrUpdate оба результата будут записаны, но один будет удален. Только один экземпляр Lazy<T> будет возвращен обоим вызывающим AddOrUpdate , поэтому Lazy<T> вызываемое вычисление будет определяться одним экземпляром.

Затем, в следующей строке, когда мы запрашиваем .Value , это фактически запустит вычисление для одного из вызывающих пользователей этого пользовательского GetOrAdd интерфейса, а другой заблокируется, пока первый вычисляет — это функциональность второго аргумента для Lazy<T> ( LazyThreadSafteMode.ExecutionAndPublication ) . Кстати, это поведение по умолчанию Lazy<T> , поэтому вам действительно не нужен второй аргумент — я просто использовал его, чтобы быть более понятным в этом сообщении.

Конечно, этот код также может быть написан как метод расширения, но, к сожалению, вам нужно знать, чтобы создать словарь с Lazy<T> объектами внутри, поэтому я думаю, что это лучше как класс-оболочка ConcurrentDictionary<TKey, TValue> .

Комментарии:

1. Это решение, которое я собирался написать, единственное, что я бы сделал по-другому, это var result = _dictionary.GetOrAdd(key, _ => new Lazy<TValue>(computation)); сделать его короче.

Ответ №2:

В этом нет необходимости GetOrAdd . Достаточно простой проверки пути и наличия ключа:

 class FileWorker
{
    private object _sync;
    private IDictionary<string, Task> _destTasks;

    public FileWorker()
    {
        _sync = new object();
        _destTasks = new Dictionary<string, Task>();
    }

    public async Task Copy(IEnumerable<FileInfo> files, string destinationFolder)
    {
        await Task.WhenAll(files.Select(f => Copy(f, destinationFolder)));
    }

    private async Task CreateDestination(string path)
    {
        await Task.Run(() =>
        {
            if (!Directory.Exists(path))
            {
                Directory.CreateDirectory(path);
            }
        });
    }

    private Task Destination(string path)
    {
        lock(_sync)
        {
            if (!_destTasks.ContainsKey(path))
            {
                _destTasks[path] = CreateDestination(path);
            }
        }
        return _destTasks[path];
    }

    private async Task Copy(FileInfo file, string destinationFolder)
    {
        await Destination(destinationFolder).ContinueWith(task => file.CopyTo(Path.Combine(destinationFolder, file.Name), true));
    }
}

class Program
{
    static void Main(string[] args)
    {
        var file1 = new FileInfo("file1.tmp");
        using(var writer = file1.CreateText())
        {
            writer.WriteLine("file 1");
        }
        var file2 = new FileInfo("file2.tmp");
        using(var writer = file2.CreateText())
        {
            writer.WriteLine("file 2");
        }
        var worker = new FileWorker();
        worker.Copy(new[] { file1, file2 }, @"C:temp").Wait();
        Console.ReadLine();
    }
}
  

Комментарии:

1. Если Destination(string path) вызывается в двух потоках одновременно, вы будете вызывать CreateDestination(string path) дважды, чего пытается избежать OP

2. @ScottChamberlain, ты имел в виду ConcurrentDictionary. containsKey не является потокобезопасным?

3. Нет, ConcurrentDictionary.ContainsKey потокобезопасно, однако комбинация if(!_destTasks.ContainsKey(path)) { _destTasks[path] = CreateDestination(path); } не является. Поскольку эти две операции не являются атомарными, возможно ContainsKey(path) одновременное выполнение двух проверок, и обе возвращают true, вызывая два вызова CreateDestination(path) to . (это точная логика, которую существующая GetOrAdd выполняет, чего не хотел OP)

4. @Scott И именно поэтому я предложил перенести оба вызова в любой «тонкий» шкафчик. В этом случае вам даже не нужны параллельные коллекции.

5. @ScottChamberlain, я согласен, что возможно, что 2 проверки containsKey выполняются одновременно, потому что это то, к чему я стремился. одновременно могут выполняться 2 проверки, но возвращается только 1 задача. И это приводит к выполнению только 1 CreateDestination .