Потокобезопасный интеллектуальный указатель для чтения и записи в C , x86-64

#c #thread-safety #x86 #smart-pointers #lock-free

#c #потокобезопасность #x86 #интеллектуальные указатели #без блокировки

Вопрос:

Я разрабатываю некоторую структуру данных без блокировки, и возникает следующая проблема.

У меня есть поток записи, который создает объекты в куче и оборачивает их в интеллектуальный указатель со счетчиком ссылок. У меня также есть много потоков чтения, которые работают с этими объектами. Код может выглядеть следующим образом:

 SmartPtr ptr;

class Reader : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr local(ptr);
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr newPtr(new Object);    
           ptr = newPtr;  
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;) // wait for crash :(
}
  

Когда я создаю локальную копию потока ptr , это означает, по крайней мере,

  1. Считывает адрес.
  2. Увеличить счетчик ссылок.

Я не могу выполнить эти две операции атомарно, и поэтому иногда мои читатели работают с удаленным объектом.

Вопрос в том, какой интеллектуальный указатель я должен использовать, чтобы обеспечить доступ для чтения и записи из нескольких потоков с правильным управлением памятью? Решение должно существовать, поскольку программисты Java даже не заботятся о такой проблеме, просто полагаясь на то, что все объекты являются ссылками и удаляются только тогда, когда их никто не использует.

Для PowerPC я нашелhttp://drdobbs.com/184401888 , выглядит красиво, но использует инструкции, связанные с загрузкой, и инструкции с условным сохранением, которых у нас нет в x86.

Насколько я понимаю, указатели boost предоставляют такую функциональность только с использованием блокировок. Мне нужно решение без блокировки.

Комментарии:

1. std::shared_ptr ? (Или, boost::shared_ptr если ваша реализация его еще не поддерживает.)

2. Что-то не так с boost::shared_ptr ?

3. Я почти уверен, что JVM использует блокировки. Является ли скорость выполнения причиной, по которой вы хотите избежать блокировок? Проверьте предыдущие вопросы, такие как c:Codeboost_1_47_0boost ; но мне интересно, не нужно ли вам выбирать либо решение на основе блокировки, зависящее от конкретных инструкций или языковой поддержки C 11, либо небезопасное решение, такое как то, с которым вы работаете.

4. Почему без блокировки? Это бутылочное горлышко вашего приложения?

5. Я не думаю, что std ::shared_ptr поддерживает атомарные назначения.

Ответ №1:

boost ::shared_ptr имеет atomic_store, который использует «безблокировочную» блокировку, которая должна быть достаточно быстрой для 99% возможных случаев.

     boost::shared_ptr<Object> ptr;
class Reader : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> local(boost::atomic_load(amp;ptr));
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> newPtr(new Object);    
           boost::atomic_store(amp;ptr, newPtr);
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;)
}
  

Редактировать:

В ответ на комментарий ниже, реализация находится в «boost / shared_ptr.hpp»…

 template<class T> void atomic_store( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock_pool<2>::scoped_lock lock( p );
    p->swap( r );
}

template<class T> shared_ptr<T> atomic_exchange( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock amp; sp = boost::detail::spinlock_pool<2>::spinlock_for( p );

    sp.lock();
    p->swap( r );
    sp.unlock();

    return r; // return std::move( r )
}
  

Комментарии:

1. Да, покажите нам реализацию atomic_swap, и ваш ответ будет полным.

2. Реализация находится в boost / shared_ptr.hpp. Это простая спин-блокировка, как я упоминал в своем ответе.

3. 1 Спасибо. Я предложил реализацию без блокировки в другом ответе, но блокировка вращения была бы проще. Так же хорошо, при условии, что он не возвращается к блокировке мьютекса после определенного количества вращений.

4. Просто убедитесь, что вы запускаете потоки на разных ядрах, иначе спин-блокировка довольно контрпродуктивна, а резервный вариант мьютекса на самом деле может быть очень полезным!

5. Мой совет заключается в том, что вы сохраняете его простым, вы НЕ увидите никакой разницы между этой блокировкой вращения и решением без блокировки, если у вас не много конкурирующих потоков ( 8 или более). Помните, что инструкция CAS не является бесплатной, поэтому, если у вас низкая конкуренция, спин-блокировка может быть быстрее. Это мои два цента по этому вопросу. Удачи с любым решением, решение RobH будет без блокировки.

Ответ №2:

С некоторыми хитростями вы должны быть в состоянии выполнить это, используя InterlockedCompareExchange128. Храните счетчик ссылок и указатель в массиве __int64 из 2 элементов. Если счетчик ссылок находится в массиве [0], а указатель в массиве [1], атомарное обновление будет выглядеть следующим образом:

 while(true)
{
    __int64 comparand[2];
    comparand[0] = refCount;
    comparand[1] = pointer;
    if(1 == InterlockedCompareExchange128(
        array,
        pointer,
        refCount   1,
        comparand))
    {
        // Pointer is ready for use. Exit the while loop.
    }
}
  

Если встроенная функция InterlockedCompareExchange128 недоступна для вашего компилятора, вы можете вместо этого использовать базовую инструкцию CMPXCHG16B, если вы не возражаете возиться с языком ассемблера.

Комментарии:

1. Разве InterlockedCompareExchange64 не должен быть достаточным для x86?

2. Я вижу, что инструкция CMPXCHG16B доступна в наборе команд x64-64, поэтому любая встроенная функция компилятора будет работать. Или OP может погрузиться в раздел __asm: -o

3. @zennehoy Да, это было бы. Однако OP имеет 64-разрядную версию, поэтому InterlockedCompareExchange128 необходим для работы как с указателем, так и с подсчетом ссылок атомарно.

4. Эмм, -1? У кого-нибудь есть проблемы с этим ответом?

5. x86-64 == x64, поэтому будет доступен InterlockedCompareExchange128 . Я просто говорю, что решение будет работать как на x86, так и на x64, либо с I-C-E-64 или -128 соответственно. Понятия не имею, почему это получило отрицательный результат, проголосуйте за.

Ответ №3:

Решение, предложенное RobH, не работает. У него та же проблема, что и в исходном вопросе: при доступе к объекту подсчета ссылок он, возможно, уже был удален.

Единственный способ, который я вижу для решения проблемы без глобальной блокировки (как в boost::atomic_store ) или условных инструкций чтения / записи, — это каким-то образом отсрочить уничтожение объекта (или общего объекта подсчета ссылок, если такая вещь используется). Итак, у зеннехоя есть хорошая идея, но его метод слишком небезопасен.

Я мог бы сделать это, сохранив копии всех указателей в потоке записи, чтобы автор мог контролировать уничтожение объектов:

 class Writer : public Thread {
    virtual void Run() {
        list<SmartPtr> ptrs; //list that holds all the old ptr values        

        for (;;) {
            SmartPtr newPtr(new Object);
            if(ptr)
                ptrs.push_back(ptr); //push previous pointer into the list
            ptr = newPtr;

            //Periodically go through the list and destroy objects that are not
            //referenced by other threads
            for(auto it=ptrs.begin(); it!=ptrs.end(); )
                if(it->refCount()==1)
                    it = ptrs.erase(it);
                else
                      it;
       }
    }
};
  

Однако все еще существуют требования к классу интеллектуального указателя. Это не работает с shared_ptr, поскольку операции чтения и записи не являются атомарными. Он почти работает с boost::intrusive_ptr. Назначение для intrusive_ptr реализовано следующим образом (псевдокод):

 //create temporary from rhs
tmp.ptr = rhs.ptr;
if(tmp.ptr)
    intrusive_ptr_add_ref(tmp.ptr);

//swap(tmp,lhs)
T* x = lhs.ptr;
lhs.ptr = tmp.ptr;
tmp.ptr = x;

//destroy temporary
if(tmp.ptr)
    intrusive_ptr_release(tmp.ptr);
  

Насколько я понимаю, единственное, чего здесь не хватает, — это ограничение памяти на уровне компилятора раньше lhs.ptr = tmp.ptr; . С добавлением этого, как чтение, так rhs и запись lhs будут потокобезопасными при строгих условиях: 1) архитектура x86 или x64 2) подсчет атомарных ссылок 3) rhs пересчет не должен равняться нулю во время назначения (гарантируется приведенным выше кодом записи) 4) запись только в один поток lhs (используя CAS, у вас может быть несколько записей).

В любом случае, вы могли бы создать свой собственный класс интеллектуальных указателей на основе intrusive_ptr с необходимыми изменениями. Определенно проще, чем повторная реализация shared_ptr. И, кроме того, если вам нужна производительность, лучше всего использовать intrusive.

Комментарии:

1. Спасибо, ваш ответ очень похож на то, что я думаю.

Ответ №4:

Причина, по которой это работает намного проще в Java, заключается в сборке мусора. В C вы должны вручную убедиться, что значение не только начинает использоваться другим потоком, когда вы хотите его удалить.

Решение, которое я использовал в аналогичной ситуации, состоит в том, чтобы просто отложить удаление значения. Я создаю отдельный поток, который перебирает список объектов, подлежащих удалению. Когда я хочу что-то удалить, я добавляю это в этот список с меткой времени. Поток удаления ожидает до некоторого фиксированного времени после этой временной метки, прежде чем фактически удалить значение. Вам просто нужно убедиться, что задержка достаточно велика, чтобы гарантировать, что любое временное использование значения завершено.

в моем случае было бы достаточно 100 миллисекунд, я выбрал несколько секунд, чтобы быть в безопасности.

Комментарии:

1. Звучит для меня как довольно сложное и рискованное решение.

2. Это было единственное решение, которое я мог придумать для удаления узлов в связанном списке без блокировки. Проблема заключалась в том, что к тому времени, когда узел был помечен для удаления, другой поток мог начать его использовать (например, путем итерации к нему). Если вы знаете какие-либо альтернативы, я хотел бы услышать о них!

3. И мои, и RobHs являются лучшими альтернативами.