C++0x 没有信号量?如何同步线程?

发布于 2024-10-14 07:24:48 字数 612 浏览 4 评论 0原文

C++0x 真的不会有信号量吗? Stack Overflow 上已经有一些关于信号量使用的问题。我一直使用它们(posix 信号量)来让一个线程等待另一个线程中的某个事件:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

如果我用互斥锁来做到这一点:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

问题:它很难看,并且不能保证 thread1 首先锁定互斥锁(鉴于相同)线程应该锁定和解锁互斥体,您也不能在 thread0 和 thread1 启动之前锁定 event1)。

那么既然 boost 也没有信号量,那么实现上述目标的最简单方法是什么?

Is it true that C++0x will come without semaphores? There are already some questions on Stack Overflow regarding the use of semaphores. I use them (posix semaphores) all the time to let a thread wait for some event in another thread:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

If I would do that with a mutex:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problem: It's ugly and it's not guaranteed that thread1 locks the mutex first (Given that the same thread should lock and unlock a mutex, you also can't lock event1 before thread0 and thread1 started).

So since boost doesn't have semaphores either, what is the simplest way to achieve the above?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(12

忘东忘西忘不掉你 2024-10-21 07:24:48

您可以轻松地从互斥体和条件变量构建一个互斥体:

#include <mutex>
#include <condition_variable>

class semaphore {
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void release() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void acquire() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_acquire() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};

You can easily build one from a mutex and a condition variable:

#include <mutex>
#include <condition_variable>

class semaphore {
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void release() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void acquire() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_acquire() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
套路撩心 2024-10-21 07:24:48

基于 Maxim Yegorushkin 的回答,我尝试以 C++11 风格制作示例。

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};

Based on Maxim Yegorushkin's answer, I tried to make the example in C++11 style.

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
抠脚大汉 2024-10-21 07:24:48

我决定尽可能地以标准的风格编写最强大/通用的 C++11 信号量(注意 using semaphore = ...,您通常只需使用命名semaphore类似于通常使用string而不是basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}

I decided to write the most robust/generic C++11 semaphore I could, in the style of the standard as much as I could (note using semaphore = ..., you normally would just use the name semaphore similar to normally using string not basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
私野 2024-10-21 07:24:48

C++20 终于有了信号量 - std::counting_semaphore

它们(至少)具有以下方法:

  • acquire() (阻塞)
  • try_acquire() (非阻塞,立即返回)
  • try_acquire_for() (非阻塞,需要一段时间)
  • try_acquire_until() (非阻塞,需要一段时间才能停止尝试)
  • release()

您可以阅读 这些 CppCon 2019 演示幻灯片,或观看 视频。还有官方提案 P0514R4,但它可能不是实际的 C++20 的最新版本。

C++20 finally has semaphores - std::counting_semaphore<max_count>.

These have (at least) the following methods:

  • acquire() (blocking)
  • try_acquire() (non-blocking, returns immediately)
  • try_acquire_for() (non-blocking, takes a duration)
  • try_acquire_until() (non-blocking, takes a time at which to stop trying)
  • release()

You can read these CppCon 2019 presentation slides, or watch the video. There's also the official proposal P0514R4, but it may not be up-to-date with actual C++20.

も让我眼熟你 2024-10-21 07:24:48

根据 posix 信号量,我想补充

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

一点,我更喜欢在方便的抽象级别使用同步机制,而不是总是使用更基本的运算符复制粘贴缝合在一起的版本。

in acordance with posix semaphores, I would add

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

And I much prefer using a synchronisation mechanism at a convenient level of abstraction, rather than always copy pasting a stitched-together version using more basic operators.

べ繥欢鉨o。 2024-10-21 07:24:48

您还可以查看 cpp11-on-multicore - 它具有可移植且最佳的信号量实现。

该存储库还包含补充 c++11 线程的其他线程好东西。

You can also check out cpp11-on-multicore - it has a portable and optimal semaphore implementation.

The repository also contains other threading goodies that complement c++11 threading.

殤城〤 2024-10-21 07:24:48

您可以使用互斥锁和条件变量。您获得互斥锁的独占访问权限,检查是否要继续或需要等待另一端。如果你需要等待,你就在一个条件下等待。当另一个线程确定您可以继续时,它会发出该条件信号。

有一个简短的示例 在 boost::thread 库中,您很可能只需复制它(C++0x 和 boost 线程库非常相似)。

You can work with mutex and condition variables. You gain exclusive access with the mutex, check whether you want to continue or need to wait for the other end. If you need to wait, you wait in a condition. When the other thread determines that you can continue, it signals the condition.

There is a short example in the boost::thread library that you can most probably just copy (the C++0x and boost thread libs are very similar).

明媚如初 2024-10-21 07:24:48

线程中的 RAII 信号量包装器也很有用:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

多线程应用程序中的使用示例:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();

Also can be useful RAII semaphore wrapper in threads:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Usage example in multithread app:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
画中仙 2024-10-21 07:24:48

我发现shared_ptr和weak_ptr,一个很长的列表,完成了我需要的工作。我的问题是,我有几个客户端想要与主机的内部数据进行交互。通常,主机会自行更新数据,但是,如果客户端请求,则主机需要停止更新,直到没有客户端访问主机数据为止。同时,客户端可以请求独占访问,以便其他客户端或主机都无法修改该主机数据。

我是如何做到这一点的,我创建了一个结构:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

每个客户端都会有一个这样的成员:

UpdateLock::ptr m_myLock;

然后主机将有一个用于排他性的weak_ptr成员,以及一个用于非排他锁的weak_ptr列表:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

有一个启用锁定的函数,另一个函数检查主机是否被锁定:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

我在 LockUpdate、IsUpdateLocked 中测试锁定,并定期在主机的 Update 例程中测试锁定。测试锁就像检查weak_ptr是否过期一样简单,并从m_locks列表中删除任何过期的内容(我只在主机更新期间执行此操作),我可以检查列表是否为空;同时,当客户端重置它们所挂载的shared_ptr时,我会自动解锁,当客户端自动销毁时也会发生这种情况。

总体效果是,由于客户端很少需要独占性(通常仅保留用于添加和删除),因此大多数情况下,对 LockUpdate( false ) 的请求(也就是说非独占性)只要 (! m_exclusiveLock) 就会成功。并且只有 (! m_exclusiveLock) 和 (m_locks.empty()) 两者都存在时,LockUpdate( true )(独占性请求)才会成功。

可以添加一个队列来缓解排他锁和非排他锁之间的冲突,但是,到目前为止我还没有发生冲突,所以我打算等到这种情况发生后再添加解决方案(主要是这样我有一个真实的测试条件)。

到目前为止,这可以很好地满足我的需求;我可以想象需要扩展它,以及扩展使用时可能出现的一些问题,但是,它实现起来很快,并且需要很少的自定义代码。

I found the shared_ptr and weak_ptr, a long with a list, did the job I needed. My issue was, I had several clients wanting to interact with a host's internal data. Typically, the host updates the data on it's own, however, if a client requests it, the host needs to stop updating until no clients are accessing the host data. At the same time, a client could ask for exclusive access, so that no other clients, nor the host, could modify that host data.

How I did this was, I created a struct:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Each client would have a member of such:

UpdateLock::ptr m_myLock;

Then the host would have a weak_ptr member for exclusivity, and a list of weak_ptrs for non-exclusive locks:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

There is a function to enable locking, and another function to check if the host is locked:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

I test for locks in LockUpdate, IsUpdateLocked, and periodically in the host's Update routine. Testing for a lock is as simple as checking if the weak_ptr's expired, and removing any expired from the m_locks list (I only do this during the host update), I can check if the list is empty; at the same time, I get automatic unlocking when a client resets the shared_ptr they are hanging onto, which also happens when a client gets destroyed automatically.

The over all effect is, since clients rarely need exclusivity (typically reserved for additions and deletions only), most of the time a request to LockUpdate( false ), that is to say non-exclusive, succeeds so long as (! m_exclusiveLock). And a LockUpdate( true ), a request for exclusivity, succeeds only when both (! m_exclusiveLock) and (m_locks.empty()).

A queue could be added to mitigate between exclusive and non-exclusive locks, however, I have had no collisions thus far, so I intend to wait until that happens to add the solution (mostly so I have a real-world test condition).

So far this is working well for my needs; I can imagine the need to expand this, and some issues that might arise over expanded use, however, this was quick to implement, and required very little custom code.

热血少△年 2024-10-21 07:24:48

与其他答案不同,我提出了一个新版本:

  1. 在删除之前取消阻止所有等待线程。在这种情况下,删除信号量会唤醒所有等待的线程,只有在所有线程都唤醒后,信号量析构函数才会退出。
  2. 具有 wait() 调用的参数,用于在超时(以毫秒为单位)过后自动解锁调用线程。
  3. 构造函数上有一个选项,可将可用资源计数限制为信号量初始化时使用的计数。这样,调用notify()次数过多不会增加信号量拥有的资源数量。
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

std::recursive_mutex g_sync_mutex;
#define sync(x) do { \
        std::unique_lock<std::recursive_mutex> lock(g_sync_mutex); \
        x; \
    } while (false);

class Semaphore {
    int _count;
    bool _limit;
    int _all_resources;
    int _wakedup;
    std::mutex _mutex;
    std::condition_variable_any _condition_variable;

public:
    /**
     * count - how many resources this semaphore holds
     * limit - limit notify() calls only up to the count value (available resources)
     */
    Semaphore (int count, bool limit)
        : _count(count),
        _limit(limit),
        _all_resources(count),
        _wakedup(count)
    {
    }

    /**
     * Unlock all waiting threads before destructing the semaphore (to avoid their segfalt later)
     */
    virtual ~Semaphore () {
        std::unique_lock<std::mutex> lock(_mutex);
        _wakeup(lock);
    }

    void _wakeup(std::unique_lock<std::mutex>& lock) {
        int lastwakeup = 0;

        while( _wakedup < _all_resources ) {
            lock.unlock();
            notify();
            lock.lock();
            // avoids 100% CPU usage if someone is not waking up properly
            if (lastwakeup == _wakedup) {
                std::this_thread::sleep_for( std::chrono::milliseconds(10) );
            }
            lastwakeup = _wakedup;
        }
    }

    // Mutex and condition variables are not movable and there is no need for smart pointers yet
    Semaphore(const Semaphore&) = delete;
    Semaphore& operator =(const Semaphore&) = delete;
    Semaphore(const Semaphore&&) = delete;
    Semaphore& operator =(const Semaphore&&) = delete;

    /**
     * Release one acquired resource.
     */
    void notify()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling notify(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count++;
        if (_limit && _count > _all_resources) {
            _count = _all_resources;
        }
        _condition_variable.notify_one();
    }

    /**
     * This function never blocks!
     * Return false if it would block when acquiring the lock. Otherwise acquires the lock and return true.
     */
    bool try_acquire() {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling try_acquire(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        if(_count <= 0) {
            return false;
        }
        _count--;
        return true;
    }

    /**
     * Return true if the timeout expired, otherwise return false.
     * timeout - how many milliseconds to wait before automatically unlocking the wait() call.
     */
    bool wait(int timeout = 0) {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling wait(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count--;
        _wakedup--;
        try {
            std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();

            while(_count < 0) {
                if (timeout < 1) {
                    _condition_variable.wait(lock);
                }
                else {
                    std::cv_status status = _condition_variable.wait_until(lock, timenow + std::chrono::milliseconds(timeout));

                    if ( std::cv_status::timeout == status) {
                        _count++;
                        _wakedup++;
                        return true;
                    }
                }
            }
        }
        catch (...) {
            _count++;
            _wakedup++;
            throw;
        }
        _wakedup++;
        return false;
    }

    /**
     * Return true if calling wait() will block the calling thread
     */
    bool locked() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count <= 0;
    }

    /**
     * Return true the semaphore has at least all resources available (since when it was created)
     */
    bool freed() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count >= _all_resources;
    }

    /**
     * Return how many resources are available:
     * - 0 means not free resources and calling wait() will block te calling thread
     * - a negative value means there are several threads being blocked
     * - a positive value means there are no threads waiting
     */
    int count() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count;
    }

    /**
     * Wake everybody who is waiting and reset the semaphore to its initial value.
     */
    void reset() {
        std::unique_lock<std::mutex> lock(_mutex);
        if(_count < 0) {
            _wakeup(lock);
        }
        _count = _all_resources;
    }
};

打印当前时间戳的实用程序:

std::string getTime() {
    char buffer[20];
#if defined( WIN32 )
    SYSTEMTIME wlocaltime;
    GetLocalTime(&wlocaltime);
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
    std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
    duration -= hours;
    auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
    duration -= minutes;
    auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
    duration -= seconds;
    auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
    duration -= milliseconds;
    time_t theTime = time( NULL );
    struct tm* aTime = localtime( &theTime );
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
    return buffer;
}

使用此信号量的示例程序:

// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
    std::cerr << getTime() << "Creating Semaphore" << std::endl;
    Semaphore* semaphore = new Semaphore(1, false);
    semaphore->wait(1000);
    semaphore->wait(1000);
    std::cerr << getTime() << "Auto Unlocking Semaphore wait" << std::endl;

    std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
    delete semaphore;

    std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
    return 0;
}

示例输出:

11:03:01.012 Creating Semaphore
11:03:02.012 Auto Unlocking Semaphore wait
11:03:07.012 Exiting after 10 seconds...

使用 EventLoop 解锁信号量的额外函数一段时间后:

std::shared_ptr<std::atomic<bool>> autowait(Semaphore* semaphore, int timeout, EventLoop<std::function<void()>>& eventloop, const char* source) {
    std::shared_ptr<std::atomic<bool>> waiting(std::make_shared<std::atomic<bool>>(true));
    sync(std::cerr << getTime() << "autowait '" << source << "'..." << std::endl);

    if (semaphore->try_acquire()) {
        eventloop.enqueue( timeout, [waiting, source, semaphore]{
            if ( (*waiting).load() ) {
                sync(std::cerr << getTime() << "Timeout '" << source << "'..." << std::endl);
                semaphore->notify();
            }
        } );
    }
    else {
        semaphore->wait(timeout);
    }
    return waiting;
}

Semaphore semaphore(1, false);
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::shared_ptr<std::atomic<bool>> waiting_something = autowait(&semaphore, 45000, eventloop, "waiting_something");

Different from other answers, I propose a new version which:

  1. Unblocks all waiting threads before being deleted. In this case, deleting the semaphore will wake up all waiting threads and only after everybody wakes up, the semaphore destructor will exit.
  2. Has a parameter to the wait() call, to automatically unlock the calling thread after the timeout in milliseconds has passed.
  3. Has an options on the construtor to limit available resources count only up to the count the semaphore was initialized with. This way, calling notify() too many times will not increase how many resources the semaphore has.
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

std::recursive_mutex g_sync_mutex;
#define sync(x) do { \
        std::unique_lock<std::recursive_mutex> lock(g_sync_mutex); \
        x; \
    } while (false);

class Semaphore {
    int _count;
    bool _limit;
    int _all_resources;
    int _wakedup;
    std::mutex _mutex;
    std::condition_variable_any _condition_variable;

public:
    /**
     * count - how many resources this semaphore holds
     * limit - limit notify() calls only up to the count value (available resources)
     */
    Semaphore (int count, bool limit)
        : _count(count),
        _limit(limit),
        _all_resources(count),
        _wakedup(count)
    {
    }

    /**
     * Unlock all waiting threads before destructing the semaphore (to avoid their segfalt later)
     */
    virtual ~Semaphore () {
        std::unique_lock<std::mutex> lock(_mutex);
        _wakeup(lock);
    }

    void _wakeup(std::unique_lock<std::mutex>& lock) {
        int lastwakeup = 0;

        while( _wakedup < _all_resources ) {
            lock.unlock();
            notify();
            lock.lock();
            // avoids 100% CPU usage if someone is not waking up properly
            if (lastwakeup == _wakedup) {
                std::this_thread::sleep_for( std::chrono::milliseconds(10) );
            }
            lastwakeup = _wakedup;
        }
    }

    // Mutex and condition variables are not movable and there is no need for smart pointers yet
    Semaphore(const Semaphore&) = delete;
    Semaphore& operator =(const Semaphore&) = delete;
    Semaphore(const Semaphore&&) = delete;
    Semaphore& operator =(const Semaphore&&) = delete;

    /**
     * Release one acquired resource.
     */
    void notify()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling notify(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count++;
        if (_limit && _count > _all_resources) {
            _count = _all_resources;
        }
        _condition_variable.notify_one();
    }

    /**
     * This function never blocks!
     * Return false if it would block when acquiring the lock. Otherwise acquires the lock and return true.
     */
    bool try_acquire() {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling try_acquire(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        if(_count <= 0) {
            return false;
        }
        _count--;
        return true;
    }

    /**
     * Return true if the timeout expired, otherwise return false.
     * timeout - how many milliseconds to wait before automatically unlocking the wait() call.
     */
    bool wait(int timeout = 0) {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling wait(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count--;
        _wakedup--;
        try {
            std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();

            while(_count < 0) {
                if (timeout < 1) {
                    _condition_variable.wait(lock);
                }
                else {
                    std::cv_status status = _condition_variable.wait_until(lock, timenow + std::chrono::milliseconds(timeout));

                    if ( std::cv_status::timeout == status) {
                        _count++;
                        _wakedup++;
                        return true;
                    }
                }
            }
        }
        catch (...) {
            _count++;
            _wakedup++;
            throw;
        }
        _wakedup++;
        return false;
    }

    /**
     * Return true if calling wait() will block the calling thread
     */
    bool locked() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count <= 0;
    }

    /**
     * Return true the semaphore has at least all resources available (since when it was created)
     */
    bool freed() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count >= _all_resources;
    }

    /**
     * Return how many resources are available:
     * - 0 means not free resources and calling wait() will block te calling thread
     * - a negative value means there are several threads being blocked
     * - a positive value means there are no threads waiting
     */
    int count() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count;
    }

    /**
     * Wake everybody who is waiting and reset the semaphore to its initial value.
     */
    void reset() {
        std::unique_lock<std::mutex> lock(_mutex);
        if(_count < 0) {
            _wakeup(lock);
        }
        _count = _all_resources;
    }
};

Utility to print the current timestamp:

std::string getTime() {
    char buffer[20];
#if defined( WIN32 )
    SYSTEMTIME wlocaltime;
    GetLocalTime(&wlocaltime);
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
    std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
    duration -= hours;
    auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
    duration -= minutes;
    auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
    duration -= seconds;
    auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
    duration -= milliseconds;
    time_t theTime = time( NULL );
    struct tm* aTime = localtime( &theTime );
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
    return buffer;
}

Example program using this semaphore:

// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
    std::cerr << getTime() << "Creating Semaphore" << std::endl;
    Semaphore* semaphore = new Semaphore(1, false);
    semaphore->wait(1000);
    semaphore->wait(1000);
    std::cerr << getTime() << "Auto Unlocking Semaphore wait" << std::endl;

    std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
    delete semaphore;

    std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
    return 0;
}

Example output:

11:03:01.012 Creating Semaphore
11:03:02.012 Auto Unlocking Semaphore wait
11:03:07.012 Exiting after 10 seconds...

Extra function which uses a EventLoop to unlock the semaphores after some time:

std::shared_ptr<std::atomic<bool>> autowait(Semaphore* semaphore, int timeout, EventLoop<std::function<void()>>& eventloop, const char* source) {
    std::shared_ptr<std::atomic<bool>> waiting(std::make_shared<std::atomic<bool>>(true));
    sync(std::cerr << getTime() << "autowait '" << source << "'..." << std::endl);

    if (semaphore->try_acquire()) {
        eventloop.enqueue( timeout, [waiting, source, semaphore]{
            if ( (*waiting).load() ) {
                sync(std::cerr << getTime() << "Timeout '" << source << "'..." << std::endl);
                semaphore->notify();
            }
        } );
    }
    else {
        semaphore->wait(timeout);
    }
    return waiting;
}

Semaphore semaphore(1, false);
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::shared_ptr<std::atomic<bool>> waiting_something = autowait(&semaphore, 45000, eventloop, "waiting_something");
面如桃花 2024-10-21 07:24:48

这是一个老问题,但我想提供另一个解决方案。
看来您需要的不是信号量,而是像 Windows Events 这样的事件。
可以像下面这样完成非常有效的事件:

#ifdef _MSC_VER
  #include <concrt.h>
#else
  // pthread implementation
  #include <cstddef>
  #include <cstdint>
  #include <shared_mutex>

namespace Concurrency
{
const unsigned int COOPERATIVE_TIMEOUT_INFINITE = (unsigned int)-1;
const size_t COOPERATIVE_WAIT_TIMEOUT = SIZE_MAX;

class event
{
public:
    event();
    ~event();

    size_t wait(unsigned int timeout = COOPERATIVE_TIMEOUT_INFINITE);
    void set();
    void reset();
    static size_t wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);

    static const unsigned int timeout_infinite = COOPERATIVE_TIMEOUT_INFINITE;
    
private:
    int d;
    std::shared_mutex guard;
};

};

namespace concurrency = Concurrency;

#include <unistd.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

#include <chrono>

#include "../HandleHolder.h"

typedef CommonHolder<int, close> fd_holder;

namespace Concurrency
{
    int watch(int ep_fd, int fd)
    {
        epoll_event ep_event;
        ep_event.events = EPOLLIN;
        ep_event.data.fd = fd;

        return epoll_ctl(ep_fd, EPOLL_CTL_ADD, fd, &ep_event);
    }

    event::event()
        : d(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
    {
    }

    event::~event()
    {
        std::unique_lock<std::shared_mutex> lock(guard);
        close(d);
        d = -1;
    }

    size_t event::wait(unsigned int timeout)
    {
        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        {
            std::shared_lock<std::shared_mutex> lock(guard);
            if (d == -1 || watch(ep_fd.GetHandle(), d) < 0)
                return COOPERATIVE_WAIT_TIMEOUT;
        }

        epoll_event ep_event;
        return epoll_wait(ep_fd.GetHandle(), &ep_event, 1, timeout) == 1 && (ep_event.events & EPOLLIN) ? 0 : COOPERATIVE_WAIT_TIMEOUT;
    }

    void event::set()
    {
        uint64_t count = 1;
        write(d, &count, sizeof(count));
    }

    void event::reset()
    {
        uint64_t count;
        read(d, &count, sizeof(count));
    }

    size_t event::wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout)
    {
        if (_FWaitAll) // not implemented
            std::abort();

        const auto deadline = _Timeout != COOPERATIVE_TIMEOUT_INFINITE ? std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() + _Timeout : COOPERATIVE_TIMEOUT_INFINITE;

        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        int fds[_Count];
        for (int i = 0; i < _Count; ++i)
        {
            std::shared_lock<std::shared_mutex> lock(_PPEvents[i]->guard);
            fds[i] = _PPEvents[i]->d;
            if (fds[i] != -1 && watch(ep_fd.GetHandle(), fds[i]) < 0)
                fds[i] = -1;
        }

        epoll_event ep_events[_Count];

        // Вызов epoll_wait может быть прерван сигналом. Ждём весь тайм-аут, так же, как в Windows
        int res = 0;
        while (true)
        {
            res = epoll_wait(ep_fd.GetHandle(), &ep_events[0], _Count, _Timeout);
            if (res == -1 && errno == EINTR && std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() < deadline)
                continue;
            break;
        }

        for (int i = 0; i < _Count; ++i)
        {
            if (fds[i] == -1)
                continue;

            for (int j = 0; j < res; ++j)
                if (ep_events[j].data.fd == fds[i] && (ep_events[j].events & EPOLLIN))
                    return i;
        }

        return COOPERATIVE_WAIT_TIMEOUT;
    }
};
#endif

然后只需使用 并发::事件

There old question but I would like to offer another solution.
It seems you need a not semathore but a event like Windows Events.
Very effective events can be done like following:

#ifdef _MSC_VER
  #include <concrt.h>
#else
  // pthread implementation
  #include <cstddef>
  #include <cstdint>
  #include <shared_mutex>

namespace Concurrency
{
const unsigned int COOPERATIVE_TIMEOUT_INFINITE = (unsigned int)-1;
const size_t COOPERATIVE_WAIT_TIMEOUT = SIZE_MAX;

class event
{
public:
    event();
    ~event();

    size_t wait(unsigned int timeout = COOPERATIVE_TIMEOUT_INFINITE);
    void set();
    void reset();
    static size_t wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);

    static const unsigned int timeout_infinite = COOPERATIVE_TIMEOUT_INFINITE;
    
private:
    int d;
    std::shared_mutex guard;
};

};

namespace concurrency = Concurrency;

#include <unistd.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

#include <chrono>

#include "../HandleHolder.h"

typedef CommonHolder<int, close> fd_holder;

namespace Concurrency
{
    int watch(int ep_fd, int fd)
    {
        epoll_event ep_event;
        ep_event.events = EPOLLIN;
        ep_event.data.fd = fd;

        return epoll_ctl(ep_fd, EPOLL_CTL_ADD, fd, &ep_event);
    }

    event::event()
        : d(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
    {
    }

    event::~event()
    {
        std::unique_lock<std::shared_mutex> lock(guard);
        close(d);
        d = -1;
    }

    size_t event::wait(unsigned int timeout)
    {
        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        {
            std::shared_lock<std::shared_mutex> lock(guard);
            if (d == -1 || watch(ep_fd.GetHandle(), d) < 0)
                return COOPERATIVE_WAIT_TIMEOUT;
        }

        epoll_event ep_event;
        return epoll_wait(ep_fd.GetHandle(), &ep_event, 1, timeout) == 1 && (ep_event.events & EPOLLIN) ? 0 : COOPERATIVE_WAIT_TIMEOUT;
    }

    void event::set()
    {
        uint64_t count = 1;
        write(d, &count, sizeof(count));
    }

    void event::reset()
    {
        uint64_t count;
        read(d, &count, sizeof(count));
    }

    size_t event::wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout)
    {
        if (_FWaitAll) // not implemented
            std::abort();

        const auto deadline = _Timeout != COOPERATIVE_TIMEOUT_INFINITE ? std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() + _Timeout : COOPERATIVE_TIMEOUT_INFINITE;

        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        int fds[_Count];
        for (int i = 0; i < _Count; ++i)
        {
            std::shared_lock<std::shared_mutex> lock(_PPEvents[i]->guard);
            fds[i] = _PPEvents[i]->d;
            if (fds[i] != -1 && watch(ep_fd.GetHandle(), fds[i]) < 0)
                fds[i] = -1;
        }

        epoll_event ep_events[_Count];

        // Вызов epoll_wait может быть прерван сигналом. Ждём весь тайм-аут, так же, как в Windows
        int res = 0;
        while (true)
        {
            res = epoll_wait(ep_fd.GetHandle(), &ep_events[0], _Count, _Timeout);
            if (res == -1 && errno == EINTR && std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() < deadline)
                continue;
            break;
        }

        for (int i = 0; i < _Count; ++i)
        {
            if (fds[i] == -1)
                continue;

            for (int j = 0; j < res; ++j)
                if (ep_events[j].data.fd == fds[i] && (ep_events[j].events & EPOLLIN))
                    return i;
        }

        return COOPERATIVE_WAIT_TIMEOUT;
    }
};
#endif

And then just use concurrency::event

っ左 2024-10-21 07:24:48

如果有人对原子版本感兴趣,这里是实现。预计性能会比互斥锁和互斥锁更好。条件变量版本。

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};

In case someone is interested in the atomic version, here is the implementation. The performance is expected better than the mutex & condition variable version.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文