C++、x86-64 中的读写线程安全智能指针

发布于 2024-12-13 22:05:55 字数 1018 浏览 0 评论 0原文

我开发了一些无锁数据结构,出现了以下问题。

我有编写器线程,它在堆上创建对象并将它们包装在带有引用计数器的智能指针中。我还有很多与这些对象一起使用的读者线程。代码可以如下所示:

SmartPtr ptr;

class Reader : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr local(ptr);
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr newPtr(new Object);    
           ptr = newPtr;  
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;) // wait for crash :(
}

当我创建 ptr 的线程本地副本时,这意味着至少

  1. 读取一个地址。
  2. 增加参考计数器。

我无法自动执行这两个操作,因此有时我的读者会使用已删除的对象。

问题是 - 我应该使用哪种智能指针来实现多个线程的读写访问以及正确的内存管理?解决方案应该存在,因为Java程序员甚至不关心这样的问题,只是依赖于所有对象都是引用,并且只有在没有人使用它们时才被删除。

对于PowerPC,我发现http://drdobbs.com/184401888,看起来不错,但使用加载链接和存储条件指令,我们x86 中没有。

据我所知,boost 指针仅使用锁来提供此类功能。我需要无锁解决方案。

I develop some lock free data structure and following problem arises.

I have writer thread that creates objects on heap and wraps them in smart pointer with reference counter. I also have a lot of reader threads, that work with these objects. Code can look like this:

SmartPtr ptr;

class Reader : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr local(ptr);
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr newPtr(new Object);    
           ptr = newPtr;  
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;) // wait for crash :(
}

When I create thread-local copy of ptr it means at least

  1. Read an address.
  2. Increment reference counter.

I can't do these two operations atomically and thus sometimes my readers work with deleted object.

The question is - what kind of smart pointer should I use to make read-write access from several threads with correct memory management possible? Solution should exist, since Java programmers don't even care about such a problem, simply relying on that all objects are references and are deleted only when nobody uses them.

For PowerPC I found http://drdobbs.com/184401888, looks nice, but uses Load-Linked and Store-Conditional instructions, that we don't have in x86.

As far I as I understand, boost pointers provide such functionality only using locks. I need lock free solution.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

野却迷人 2024-12-20 22:05:55

boost::shared_ptr 有atomic_store,它使用“无锁”自旋锁,对于 99% 的可能情况应该足够快。

    boost::shared_ptr<Object> ptr;
class Reader : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> local(boost::atomic_load(&ptr));
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> newPtr(new Object);    
           boost::atomic_store(&ptr, newPtr);
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;)
}

编辑:

为了回应下面的评论,实现位于“boost/shared_ptr.hpp”中...

template<class T> void atomic_store( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock_pool<2>::scoped_lock lock( p );
    p->swap( r );
}

template<class T> shared_ptr<T> atomic_exchange( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock & sp = boost::detail::spinlock_pool<2>::spinlock_for( p );

    sp.lock();
    p->swap( r );
    sp.unlock();

    return r; // return std::move( r )
}

boost::shared_ptr have atomic_store which uses a "lock-free" spinlock which should be fast enough for 99% of possible cases.

    boost::shared_ptr<Object> ptr;
class Reader : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> local(boost::atomic_load(&ptr));
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> newPtr(new Object);    
           boost::atomic_store(&ptr, newPtr);
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;)
}

EDIT:

In response to comment below, the implementation is in "boost/shared_ptr.hpp"...

template<class T> void atomic_store( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock_pool<2>::scoped_lock lock( p );
    p->swap( r );
}

template<class T> shared_ptr<T> atomic_exchange( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock & sp = boost::detail::spinlock_pool<2>::spinlock_for( p );

    sp.lock();
    p->swap( r );
    sp.unlock();

    return r; // return std::move( r )
}
不即不离 2024-12-20 22:05:55

通过一些技巧,您应该能够使用 InterlockedCompareExchange128 来完成此操作。将引用计数和指针存储在 2 元素 __int64 数组中。如果引用计数位于 array[0] 中且指针位于 array[1] 中,原子更新将如下所示:

while(true)
{
    __int64 comparand[2];
    comparand[0] = refCount;
    comparand[1] = pointer;
    if(1 == InterlockedCompareExchange128(
        array,
        pointer,
        refCount + 1,
        comparand))
    {
        // Pointer is ready for use. Exit the while loop.
    }
}

如果 InterlockedCompareExchange128 内部函数不可用于您的编译器,那么您可以使用底层 CMPXCHG16B 指令,如果您不这样做的话不介意用汇编语言乱搞。

With some jiggery-pokery you should be able to accomplish this using InterlockedCompareExchange128. Store the reference count and pointer in a 2 element __int64 array. If reference count is in array[0] and pointer in array[1] the atomic update would look like this:

while(true)
{
    __int64 comparand[2];
    comparand[0] = refCount;
    comparand[1] = pointer;
    if(1 == InterlockedCompareExchange128(
        array,
        pointer,
        refCount + 1,
        comparand))
    {
        // Pointer is ready for use. Exit the while loop.
    }
}

If an InterlockedCompareExchange128 intrinsic function isn't available for your compiler then you may use the underlying CMPXCHG16B instruction instead, if you don't mind mucking around in assembly language.

痴梦一场 2024-12-20 22:05:55

RobH 提出的解决方案不起作用。它与原来的问题有同样的问题:当访问引用计数对象时,它可能已经被删除了。

我认为在没有全局锁(如 boost::atomic_store )或条件读/写指令的情况下解决问题的唯一方法是以某种方式延迟对象(或共享引用计数对象,如果使用此类东西)的销毁。所以zennehoy有一个好主意,但是他的方法太不安全了。

我可能会这样做的方法是在编写器线程中保留所有指针的副本,以便编写器可以控制对象的销毁:

class Writer : public Thread {
    virtual void Run() {
        list<SmartPtr> ptrs; //list that holds all the old ptr values        

        for (;;) {
            SmartPtr newPtr(new Object);
            if(ptr)
                ptrs.push_back(ptr); //push previous pointer into the list
            ptr = newPtr;

            //Periodically go through the list and destroy objects that are not
            //referenced by other threads
            for(auto it=ptrs.begin(); it!=ptrs.end(); )
                if(it->refCount()==1)
                    it = ptrs.erase(it);
                else
                    ++it;
       }
    }
};

但是,对智能指针类仍然有要求。这不适用于shared_ptr,因为读取和写入不是原子的。它几乎可以与 boost::intrusive_ptr 一起使用。 intrusive_ptr 上的赋值是这样实现的(伪代码):

//create temporary from rhs
tmp.ptr = rhs.ptr;
if(tmp.ptr)
    intrusive_ptr_add_ref(tmp.ptr);

//swap(tmp,lhs)
T* x = lhs.ptr;
lhs.ptr = tmp.ptr;
tmp.ptr = x;

//destroy temporary
if(tmp.ptr)
    intrusive_ptr_release(tmp.ptr);

据我所知,这里唯一缺少的是 lhs.ptr = tmp.ptr; 之前的编译器级别内存栅栏。添加后,读取 rhs 和写入 lhs 在严格条件下都是线程安全的:1) x86 或 x64 架构 2) 原子引用计数 3) rhs 赋值期间引用计数不得为零(由上面的 Writer 代码保证)4) 只有一个线程写入 lhs(使用 CAS,您可以有多个 writer)。

无论如何,您可以基于 intrusive_ptr 创建自己的智能指针类并进行必要的更改。绝对比重新实现shared_ptr更容易。此外,如果你想要性能,侵入式是最好的选择。

The solution proposed by RobH doesn't work. It has the same problem as the original question: when accessing the reference count object, it might already have been deleted.

The only way I see of solving the problem without a global lock (as in boost::atomic_store) or conditional read/write instructions is to somehow delay the destruction of the object (or the shared reference count object if such thing is used). So zennehoy has a good idea but his method is too unsafe.

The way I might do it is by keeping copies of all the pointers in the writer thread so that the writer can control the destruction of the objects:

class Writer : public Thread {
    virtual void Run() {
        list<SmartPtr> ptrs; //list that holds all the old ptr values        

        for (;;) {
            SmartPtr newPtr(new Object);
            if(ptr)
                ptrs.push_back(ptr); //push previous pointer into the list
            ptr = newPtr;

            //Periodically go through the list and destroy objects that are not
            //referenced by other threads
            for(auto it=ptrs.begin(); it!=ptrs.end(); )
                if(it->refCount()==1)
                    it = ptrs.erase(it);
                else
                    ++it;
       }
    }
};

However there are still requirements for the smart pointer class. This doesn't work with shared_ptr as the reads and writes are not atomic. It almost works with boost::intrusive_ptr. The assignment on intrusive_ptr is implemented like this (pseudocode):

//create temporary from rhs
tmp.ptr = rhs.ptr;
if(tmp.ptr)
    intrusive_ptr_add_ref(tmp.ptr);

//swap(tmp,lhs)
T* x = lhs.ptr;
lhs.ptr = tmp.ptr;
tmp.ptr = x;

//destroy temporary
if(tmp.ptr)
    intrusive_ptr_release(tmp.ptr);

As far as I understand the only thing missing here is a compiler level memory fence before lhs.ptr = tmp.ptr;. With that added, both reading rhs and writing lhs would be thread-safe under strict conditions: 1) x86 or x64 architecture 2) atomic reference counting 3) rhs refcount must not go to zero during the assignment (guaranteed by the Writer code above) 4) only one thread writing to lhs (using CAS you could have several writers).

Anyway, you could create your own smart pointer class based on intrusive_ptr with necessary changes. Definitely easier than re-implementing shared_ptr. And besides, if you want performance, intrusive is the way to go.

手长情犹 2024-12-20 22:05:55

这在 Java 中更容易工作的原因是垃圾收集。在 C++ 中,当您想要删除某个值时,您必须手动确保该值不是刚刚开始被其他线程使用。

我在类似情况下使用的解决方案是简单地延迟删除该值。我创建一个单独的线程来迭代要删除的内容列表。当我想删除某些内容时,我会将其添加到此列表中并带有时间戳。删除线程会等待该时间戳之后的某个固定时间,然后才实际删除该值。您只需确保延迟足够大,以保证该值的任何临时使用都已完成。

对我来说 100 毫秒就足够了,为了安全起见,我选择了几秒钟。

The reason this works much more easily in java is garbage collection. In C++, you have to manually ensure that a value is not just starting to be used by a different thread when you want to delete it.

A solution I've used in a similar situation is to simply delay the deletion of the value. I create a separate thread that iterates through a list of things to be deleted. When I want to delete something, I add it to this list with a timestamp. The deleting thread waits until some fixed time after this timestamp before actually deleting the value. You just have to make sure that the delay is large enough to guarantee that any temporary use of the value has completed.

100 milliseconds would have been enough in my case, I chose a few seconds to be safe.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文