#c #boost #stl #thread-safety
#c #повышение #stl #потокобезопасность
Вопрос:
Кажется, все работает, но я не уверен, что это лучший способ добиться этого.
В принципе, у меня есть объект, который выполняет асинхронный поиск данных. Этот объект имеет вектор указателей, которые выделяются и отменяются в главном потоке. Используя функции boost, обратный вызов результатов процесса привязывается к одному из указателей в этом векторе. При запуске он будет запущен в некотором произвольном потоке и изменит данные указателя.
Теперь у меня есть критические разделы вокруг частей, которые вставляются в вектор и стираются на случай, если объект асинхронного извлечения получает больше запросов, но мне интересно, нужна ли мне какая-то защита в обратном вызове, которая также изменяет данные указателя.
Надеюсь, этот уменьшенный псевдокод прояснит ситуацию:
class CAsyncRetriever
{
// typedefs of boost functions
class DataObject
{
// methods and members
};
public:
// Start single asynch retrieve with completion callback
void Start(SomeArgs)
{
SetupRetrieve(SomeArgs);
LaunchRetrieves();
}
protected:
void SetupRetrieve(SomeArgs)
{
// ...
{ // scope for data lock
boost::lock_guard<boost::mutex> lock(m_dataMutex);
m_inProgress.push_back(SmartPtr<DataObject>(new DataObject)));
m_callback = boost::bind(amp;CAsyncRetriever::ProcessResults, this, _1, m_inProgress.back());
}
// ...
}
void ProcessResults(DataObject* data)
{
// CALLED ON ANOTHER THREAD ... IS THIS SAFE?
data->m_SomeMember.SomeMethod();
data->m_SomeOtherMember = SomeStuff;
}
void Cleanup()
{
// ...
{ // scope for data lock
boost::lock_guard<boost::mutex> lock(m_dataMutex);
while(!m_inProgress.empty() amp;amp; m_inProgress.front()->IsComplete())
m_inProgress.erase(m_inProgress.begin());
}
// ...
}
private:
std::vector<SmartPtr<DataObject>> m_inProgress;
boost::mutex m_dataMutex;
// other members
};
Редактировать: Это фактический код для обратного вызова ProccessResults (плюс комментарии для вашей пользы)
void ProcessResults(CRetrieveResults* pRetrieveResults, CRetData* data)
{
// pRetrieveResults is delayed binding that server passes in when invoking callback in thread pool
// data is raw pointer to ref counted object in vector of main thread (the DataObject* in question)
// if there was an error set the code on the atomic int in object
data->m_nErrorCode.Store_Release(pRetrieveResults->GetErrorCode());
// generic iterator of results bindings for generic sotrage class item
TPackedDataIterator<GenItem::CBind> dataItr(amp;pRetrieveResults->m_DataIter);
// namespace function which will iterate results and initialize generic storage
GenericStorage::InitializeItems<GenItem>(amp;data->m_items, dataItr, pRetrieveResults->m_nTotalResultsFound); // this is potentially time consuming depending on the amount of results and amount of columns that were bound in storage class definition (i.e.about 8 seconds for a million equipment items in release)
// atomic uint32_t that is incremented when kicking off async retrieve
m_nStarted.Decrement(); // this one is done processing
// boost function completion callback bound to interface that requested results
data->m_complete(data->m_items);
}
Комментарии:
1. Вам нужно добавить больше информации, если вы ожидаете разумного ответа. В настоящее время отсутствуют большие важные биты, в частности, что такое и как
SomeMethod
,IsComplete
реализовано и как поднят флаг завершения . Если последняя строка функции обработки является присваиванием, если только это не пользовательский тип, и она блокируется, и для нее установленоIsComplete
значение true, ответ — нет, это небезопасно. Но я предполагаю, что эта функция обработки — всего лишь скелет.2. да, многое из этого — скелет, я просто хотел показать, что он вызывает методы и назначает члены внутри него. На данный момент я думаю, что со мной все будет в порядке, пока сохраняется семантика наличия только одного потока на обратный вызов для каждого указателя. Инкрементные асинхронные извлечения приводят к слишком большой задержке, поэтому я пока не беспокоюсь об этой реализации.
3. По сути, в этом подходе нет ничего неправильного, но дьявол кроется в деталях. В зависимости от того, каковы фактические операции и как выполняется синхронизация с общими данными (я предполагаю, что общие данные — это всего лишь флаг, проверяемый
IsComplete
), тогда это может быть правильным. Является ли это лучшим подходом или может быть упрощено, это другой вопрос, но без контекста невозможно сказать. Вещи, которые могут иметь значение: является ли реализацияSmartPtr
потокобезопасной? Там существует высокий потенциал для состояния гонки.4. Кстати,
ProcessResults
вероятно, должно бытьstatic
, если это не так,static
вы делитесьthis
указателем, и тогда вам также нужно добавить блокировки внутриProcessResults
для каждого объекта-члена.5. Вероятно, вы правы … однако я думаю, что я просто перестану передавать асинхронный ретривер в обратном вызове завершения, поскольку это на самом деле не нужно, и тогда единственным другим используемым элементом будет atomic, так что все будет в порядке.
Ответ №1:
В нынешнем виде, похоже, что Cleanup
код может уничтожить объект, для которого выполняется обратный вызов ProcessResults
. Это вызовет проблемы при отмене ссылки на указатель при обратном вызове.
Мое предложение состояло бы в том, чтобы вы расширили семантику вашего m_dataMutex
, чтобы охватить обратный вызов, хотя, если обратный вызов выполняется долго или может выполняться встроенным внутри SetupRetrieve
(иногда это случается — хотя здесь вы заявляете, что обратный вызов выполняется в другом потоке, и в этом случае вы в порядке), тогда все сложнее. В настоящее время m_dataMutex
немного запутался, контролирует ли он доступ к вектору, или его содержимому, или и то, и другое. После уточнения области его действия ProcessResults
можно было бы расширить, чтобы проверить достоверность полезной нагрузки в пределах блокировки.
Комментарии:
1. Это то, что проверяет метод IsComplete, здесь не о чем беспокоиться… Я думал о блокировке обратного вызова, но они вызываются в произвольное время произвольными потоками, поскольку сервер выполняет асинхронные задачи. Блокировка одного мьютекса синхронизируется и, таким образом, образует бутылочное горлышко, тогда как в настоящее время обратные вызовы могут выполняться параллельно.
2. @AJG85 — поскольку ваш обратный вызов получает необработанный указатель, вы обязательно должны убедиться, что он все еще действителен, прежде чем использовать его. Можете ли вы преобразовать интеллектуальный указатель в тип с подсчетом ссылок и заблокировать вектор только для проверки, что все в порядке, и получить безопасную ссылку на объект? Тогда основная часть вашей логики обратного вызова все еще может выполняться параллельно. Другая проблема заключается в том, вмешивается ли манипулирование объектом в обратном вызове в логику любого другого потока, опять же, это проблема, если это так.
3. Я пропустил проверки null и большую часть кода, который не казался релевантным. Также «SmartPtr» имеет другое имя, но это созданный нами объект с подсчетом ссылок, совместимый с контейнерами STL (возможно, когда-нибудь от него откажутся в пользу C 0x)… существует слабый класс-друг указателя, который я мог бы использовать для получения ссылки, но срок действия изменений должен выходить за рамки обратного вызова и потока. Также ничто другое не может видеть или использовать частный объект до окончания обратного вызова результатов процесса, когда он запускает обратный вызов завершения.
Ответ №2:
Нет, это небезопасно.
ProcessResults
работает со структурой данных, переданной ему через DataObject
. Это указывает на то, что у вас общее состояние между разными потоками, и если оба потока работают со структурой данных одновременно, у вас могут возникнуть некоторые проблемы.
Комментарии:
1. Для каждого извлечения выполняется только один обратный вызов. Таким образом, только один поток обращается к указателю в любое время. Если бы вы сделали 5 запросов, это выделило бы 5 объектов, создало 5 привязок, запустило бы их, было бы вызвано 5 обратных вызовов в 5 других потоках для выполнения грязной работы, а затем все было бы очищено позже, при следующем отправлении дополнительных запросов, если они будут выполнены.
2. Также «завершенность» определяется запуском обратного вызова, завершением его обработки и последующим запуском полного обратного вызова в конце его выполнения.
3. Ах, назначения, выполняемые при обратном вызове, как бы «подразумевают», что вы используете
DataObject
для передачи другой структуры, чтобы метод обратного вызова работал с ней. Не могли бы вы дополнить вопрос немного более подробной информацией о семантике метода обратного вызова, т. Е. над чем он работает и является ли он общим для потоков?4. Он вызывается между потоками, но, насколько я понимаю, у каждого потока будет свой собственный стек, так что пока между потоками ничего не разделяется, все может быть в порядке… Если хотите, я опубликую реальный обратный вызов в виде правки.
Ответ №3:
Обновление указателя должно быть атомарной операцией, но вы можете использовать InterlockedExchangePointer
(в Windows) для уверенности. Не уверен, каким был бы эквивалент Linux.
Единственным соображением в этом случае было бы, использует ли один поток устаревший указатель. Удаляет ли другой поток объект, на который указывает исходный указатель? Если это так, то у вас есть определенная проблема.
Комментарии:
1.Он не обновляет указатель,
ProcessResults
оперирует структурой данных, переданной ему.2. @Khaled: Это указатель, но он вызывает методы и изменяет элементы объекта, на который указывает указатель.
3. Итак, мне нужно использовать атомарный указатель? Мне не нужно менять местами то, на что указывает указатель, чтобы просто использовать это. Также другой поток ничего не удаляет, поскольку освобождение того, что было создано в другом потоке, звучит как плохая новость в любой день недели.
4. @AJG85: Я имел в виду: «Проблема не в обновлении указателя», глядя на формулировку моего предыдущего комментария, я признаю, что это сбивает с толку. В любом случае, реальная проблема, похоже, заключается в функции обратного вызова.