从另一个线程修改向量中指针的数据是否安全?

发布于 2024-11-15 01:41:22 字数 3117 浏览 2 评论 0原文

事情似乎很有效,但我不确定这是否是最好的方法。

基本上我有一个异步数据检索的对象。该对象有一个指针向量,它们在主线程上分配和取消分配。使用 boost 函数,过程结果回调与该向量中的指针之一绑定。当它触发时,它将在某个任意线程上运行并修改指针的数据。

现在,我在推入向量并擦除的部分周围有一些关键部分,以防异步检索对象收到更多请求,但我想知道回调中是否需要某种类型的保护来修改指针数据。

希望这个精简的伪代码能让事情变得更清楚:

class CAsyncRetriever
{
    // typedefs of boost functions

    class DataObject
    {
         // methods and members
    };

public:
    // Start single asynch retrieve with completion callback
    void Start(SomeArgs)
    {
        SetupRetrieve(SomeArgs);
        LaunchRetrieves();
    }

protected:
    void SetupRetrieve(SomeArgs)
    {
            // ...

        { // scope for data lock
            boost::lock_guard<boost::mutex> lock(m_dataMutex);
            m_inProgress.push_back(SmartPtr<DataObject>(new DataObject)));
            m_callback = boost::bind(&CAsyncRetriever::ProcessResults, this, _1, m_inProgress.back());
        }

            // ...
    }

    void ProcessResults(DataObject* data)
    {
                // CALLED ON ANOTHER THREAD ... IS THIS SAFE?
        data->m_SomeMember.SomeMethod();
                data->m_SomeOtherMember = SomeStuff;
    }

    void Cleanup()
    {
                // ...

        { // scope for data lock
            boost::lock_guard<boost::mutex> lock(m_dataMutex);
            while(!m_inProgress.empty() && m_inProgress.front()->IsComplete())
                m_inProgress.erase(m_inProgress.begin());
        }

                // ...
         }

private:
    std::vector<SmartPtr<DataObject>> m_inProgress;
    boost::mutex m_dataMutex;
        // other members
};

编辑:这是 ProcessResults 回调的实际代码(加上注释以供您参考)

    void ProcessResults(CRetrieveResults* pRetrieveResults, CRetData* data)
        {
// pRetrieveResults is delayed binding that server passes in when invoking callback in thread pool
// data is raw pointer to ref counted object in vector of main thread (the DataObject* in question)

                // if there was an error set the code on the atomic int in object
            data->m_nErrorCode.Store_Release(pRetrieveResults->GetErrorCode());

                // generic iterator of results bindings for generic sotrage class item
            TPackedDataIterator<GenItem::CBind> dataItr(&pRetrieveResults->m_DataIter);
                // namespace function which will iterate results and initialize generic storage
            GenericStorage::InitializeItems<GenItem>(&data->m_items, dataItr, pRetrieveResults->m_nTotalResultsFound); // this is potentially time consuming depending on the amount of results and amount of columns that were bound in storage class definition (i.e.about 8 seconds for a million equipment items in release)
                // atomic uint32_t that is incremented when kicking off async retrieve
            m_nStarted.Decrement(); // this one is done processing

                // boost function completion callback bound to interface that requested results
            data->m_complete(data->m_items);
        }

Things seem to be working but I'm unsure if this is the best way to go about it.

Basically I have an object which does asynchronous retrieval of data. This object has a vector of pointers which are allocated and de-allocated on the main thread. Using boost functions a process results callback is bound with one of the pointers in this vector. When it fires it will be running on some arbitrary thread and modify the data of the pointer.

Now I have critical sections around the parts that are pushing into the vector and erasing in case the asynch retrieval object is receives more requests but I'm wondering if I need some kind of guard in the callback that is modifying the pointer data as well.

Hopefully this slimmed down pseudo code makes things more clear:

class CAsyncRetriever
{
    // typedefs of boost functions

    class DataObject
    {
         // methods and members
    };

public:
    // Start single asynch retrieve with completion callback
    void Start(SomeArgs)
    {
        SetupRetrieve(SomeArgs);
        LaunchRetrieves();
    }

protected:
    void SetupRetrieve(SomeArgs)
    {
            // ...

        { // scope for data lock
            boost::lock_guard<boost::mutex> lock(m_dataMutex);
            m_inProgress.push_back(SmartPtr<DataObject>(new DataObject)));
            m_callback = boost::bind(&CAsyncRetriever::ProcessResults, this, _1, m_inProgress.back());
        }

            // ...
    }

    void ProcessResults(DataObject* data)
    {
                // CALLED ON ANOTHER THREAD ... IS THIS SAFE?
        data->m_SomeMember.SomeMethod();
                data->m_SomeOtherMember = SomeStuff;
    }

    void Cleanup()
    {
                // ...

        { // scope for data lock
            boost::lock_guard<boost::mutex> lock(m_dataMutex);
            while(!m_inProgress.empty() && m_inProgress.front()->IsComplete())
                m_inProgress.erase(m_inProgress.begin());
        }

                // ...
         }

private:
    std::vector<SmartPtr<DataObject>> m_inProgress;
    boost::mutex m_dataMutex;
        // other members
};

Edit: This is the actual code for the ProccessResults callback (plus comments for your benefit)

    void ProcessResults(CRetrieveResults* pRetrieveResults, CRetData* data)
        {
// pRetrieveResults is delayed binding that server passes in when invoking callback in thread pool
// data is raw pointer to ref counted object in vector of main thread (the DataObject* in question)

                // if there was an error set the code on the atomic int in object
            data->m_nErrorCode.Store_Release(pRetrieveResults->GetErrorCode());

                // generic iterator of results bindings for generic sotrage class item
            TPackedDataIterator<GenItem::CBind> dataItr(&pRetrieveResults->m_DataIter);
                // namespace function which will iterate results and initialize generic storage
            GenericStorage::InitializeItems<GenItem>(&data->m_items, dataItr, pRetrieveResults->m_nTotalResultsFound); // this is potentially time consuming depending on the amount of results and amount of columns that were bound in storage class definition (i.e.about 8 seconds for a million equipment items in release)
                // atomic uint32_t that is incremented when kicking off async retrieve
            m_nStarted.Decrement(); // this one is done processing

                // boost function completion callback bound to interface that requested results
            data->m_complete(data->m_items);
        }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

浮生面具三千个 2024-11-22 01:41:22

就目前情况而言,Cleanup 代码似乎可以销毁正在执行 ProcessResults 回调的对象。当您在回调中取消引用指针时,这会导致问题。

我的建议是,您扩展 m_dataMutex 的语义以包含回调,但如果回调长时间运行,或者可能在 SetupRetrieve 中内联发生(有时这确实如此)发生 - 虽然在这里你声明回调是在不同的线程上,在这种情况下你没问题)然后事情就更复杂了。目前,m_dataMutex 对于它是否控制对向量或其内容或两者的访问有点困惑。明确其范围后,可以增强 ProcessResults 以验证锁内有效负载的有效性。

As it stands, it appears that the Cleanup code can destroy an object for which a callback to ProcessResults is in flight. That's going to cause problems when you deref the pointer in the callback.

My suggestion would be that you extend the semantics of your m_dataMutex to encompass the callback, though if the callback is long-running, or can happen inline within SetupRetrieve (sometimes this does happen - though here you state the callback is on a different thread, in which case you are OK) then things are more complex. Currently m_dataMutex is a bit confused about whether it controls access to the vector, or its contents, or both. With its scope clarified, ProcessResults could then be enhanced to verify validity of the payload within the lock.

夏夜暖风 2024-11-22 01:41:22

不,这不安全。

ProcessResults 对通过DataObject 传递给它的数据结构进行操作。它表明您在不同线程之间共享状态,如果两个线程同时对数据结构进行操作,您可能会遇到一些麻烦。

No, it isn't safe.

ProcessResults operates on the data structure passed to it through DataObject. It indicates that you have shared state between different threads, and if both threads operate on the data structure concurrently you might have some trouble coming your way.

无戏配角 2024-11-22 01:41:22

更新指针应该是一个原子操作,但您可以使用 InterlockedExchangePointer(在 Windows 中)可以肯定。不确定 Linux 的等价物是什么。

唯一需要考虑的是一个线程是否正在使用过时的指针。另一个线程是否删除了原指针所指向的对象?如果是这样,那么你肯定有问题。

Updating a pointer should be an atomic operation, but you can use InterlockedExchangePointer (in Windows) to be sure. Not sure what the Linux equivalent would be.

The only consideration then would be if one thread is using an obsolete pointer. Does the other thread delete the object pointed to by the original pointer? If so, you have a definite problem.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文