如何反转 set_value() 和“停用”一个承诺?

发布于 2024-11-18 02:52:56 字数 152 浏览 6 评论 0原文

我在这里有一个关于同步的问题。我有一个“编写者”线程,它在每次迭代时为承诺分配不同的值“p”。我需要“读取器”线程等待该值的shared_futures,然后处理它们,我的问题是如何使用 future/promise 来确保读取器线程在执行处理任务之前等待“p”的新更新每次迭代?非常感谢。

I have a total n00b question here on synchronization. I have a 'writer' thread which assigns a different value 'p' to a promise at each iteration. I need 'reader' threads which wait for shared_futures of this value and then process them, and my question is how do I use future/promise to ensure that the reader threads wait for a new update of 'p' before performing their processing task at each iteration? Many thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

¢好甜 2024-11-25 02:52:56

您可以通过将 Promise 分配给空白 Promise 来“重置”它。

myPromise = promise< int >();

更完整的示例:

promise< int > myPromise;

void writer()
{
    for( int i = 0; i < 10; ++i )
    {
        cout << "Setting promise.\n";
        myPromise.set_value( i );

        myPromise = promise< int >{};       // Reset the promise.

        cout << "Waiting to set again...\n";
        this_thread::sleep_for( chrono::seconds( 1 ));
    }
}

void reader()
{
    int result;
    do
    {
        auto myFuture = myPromise.get_future();
        cout << "Waiting to receive result...\n";
        result = myFuture.get();
        cout << "Received " << result << ".\n";
    } while( result < 9 );
}

int main()
{
    std::thread write( writer );
    std::thread read( reader );

    write.join();
    read.join();

    return 0;
}

但是,这种方法的一个问题是,两个线程之间的同步可能会导致编写器在读取器调用 之间多次调用 promise::set_value() future::get()future::get() 在重置 Promise 时被调用。这些问题可以小心地避免(例如,在调用之间进行适当的睡眠),但这将我们带入黑客和猜测的领域,而不是逻辑上正确的并发性。

因此,尽管可以通过将 Promise 分配给新的 Promise 来重置 Promise,但这样做往往会引发更广泛的同步问题。

You can "reset" a promise by assigning it to a blank promise.

myPromise = promise< int >();

A more complete example:

promise< int > myPromise;

void writer()
{
    for( int i = 0; i < 10; ++i )
    {
        cout << "Setting promise.\n";
        myPromise.set_value( i );

        myPromise = promise< int >{};       // Reset the promise.

        cout << "Waiting to set again...\n";
        this_thread::sleep_for( chrono::seconds( 1 ));
    }
}

void reader()
{
    int result;
    do
    {
        auto myFuture = myPromise.get_future();
        cout << "Waiting to receive result...\n";
        result = myFuture.get();
        cout << "Received " << result << ".\n";
    } while( result < 9 );
}

int main()
{
    std::thread write( writer );
    std::thread read( reader );

    write.join();
    read.join();

    return 0;
}

A problem with this approach, however, is that synchronization between the two threads can cause the writer to call promise::set_value() more than once between the reader's calls to future::get(), or future::get() to be called while the promise is being reset. These problems can be avoided with care (e.g. with proper sleeping between calls), but this takes us into the realm of hacking and guesswork rather than logically correct concurrency.

So although it's possible to reset a promise by assigning it to a fresh promise, doing so tends to raise broader synchronization issues.

谁的新欢旧爱 2024-11-25 02:52:56

promise/future 对被设计为仅携带单个值(或异常)。要执行您所描述的操作,您可能需要采用不同的工具。

如果您希望多个线程(您的读者)全部停止在一个公共点,您可以考虑使用屏障

A promise/future pair is designed to carry only a single value (or exception.). To do what you're describing, you probably want to adopt a different tool.

If you wish to have multiple threads (your readers) all stop at a common point, you might consider a barrier.

无戏配角 2024-11-25 02:52:56

以下代码演示了如何使用 futurepromise 实现生产者/消费者模式。

有两个 promise 变量,由生产者线程和消费者线程使用。每个线程都会重置两个 promise 变量之一并等待另一个。

#include <iostream>
#include <future>
#include <thread>
using namespace std;

// produces integers from 0 to 99
void producer(promise<int>& dataready, promise<void>& consumed)
{
    for (int i = 0; i < 100; ++i) {
        // do some work here ...
        consumed = promise<void>{};      // reset
        dataready.set_value(i);          // make data available
        consumed.get_future().wait();    // wait for the data to be consumed
    
    }
    dataready.set_value(-1);                     // no more data
}

// consumes integers
void consumer(promise<int>& dataready, promise<void>& consumed)
{
    for (;;) {
        int n = dataready.get_future().get();    // wait for data ready
        if (n >= 0) {
            std::cout << n << ",";
            dataready = promise<int>{};  // reset
            consumed.set_value();        // mark data as consumed
            // do some work here ...
        }
        else
            break;
    }
}

int main(int argc, const char*argv[])
{
    promise<int> dataready{};
    promise<void> consumed{};

    thread th1([&] {producer(dataready, consumed); });
    thread th2([&] {consumer(dataready, consumed); });

    th1.join();
    th2.join();
    std::cout  << "\n";

    return 0;
}

The following code demonstrates how the producer/consumer pattern can be implemented with future and promise.

There are two promise variables, used by a producer and a consumer thread. Each thread resets one of the two promise variables and waits for the other one.

#include <iostream>
#include <future>
#include <thread>
using namespace std;

// produces integers from 0 to 99
void producer(promise<int>& dataready, promise<void>& consumed)
{
    for (int i = 0; i < 100; ++i) {
        // do some work here ...
        consumed = promise<void>{};      // reset
        dataready.set_value(i);          // make data available
        consumed.get_future().wait();    // wait for the data to be consumed
    
    }
    dataready.set_value(-1);                     // no more data
}

// consumes integers
void consumer(promise<int>& dataready, promise<void>& consumed)
{
    for (;;) {
        int n = dataready.get_future().get();    // wait for data ready
        if (n >= 0) {
            std::cout << n << ",";
            dataready = promise<int>{};  // reset
            consumed.set_value();        // mark data as consumed
            // do some work here ...
        }
        else
            break;
    }
}

int main(int argc, const char*argv[])
{
    promise<int> dataready{};
    promise<void> consumed{};

    thread th1([&] {producer(dataready, consumed); });
    thread th2([&] {consumer(dataready, consumed); });

    th1.join();
    th2.join();
    std::cout  << "\n";

    return 0;
}
等往事风中吹 2024-11-25 02:52:56

这个答案支持 Jeff Wofford 给出的答案,马纳古法比奥

杰夫和法比奥的解决方案有效。但要澄清的是,解决方案中发生的情况是,所谓的承诺重置并不是按所说重置承诺,而只是将新的承诺分配到为承诺所做的空间分配中。在 Fabio 的解决方案中,此分配是在 main 的堆栈上进行的,而在 Jeff 的解决方案中,它是一个全局变量。在每次迭代中,新的 Promise 是一个完全不同的 Promise,并且具有新的共享状态,并且从前一个 Promise 检索到的任何 future 都不会引用这个新的共享状态。

因此,这些解决方案并不完全是重置,而是回收承诺。每次迭代都会发生新的共享状态分配和解除分配。

正如马纳古提到的,

promise/future 对被设计为仅携带单个值(或
例外。)。

它用作单次异步数据通信机制。

This answer is in support of, along with a tiny bit of clarification, the answers given by Jeff Wofford, Managu and Fabio.

Jeff's and Fabio's solutions work. But just to make it clear, what is happening in the solutions is that the so called reset of the promise is not resetting the promise per say, but just move assigning a new promise into the space allocation made for the promise. In Fabio's solution this allocation is made on the stack in main, and in Jeff's solution it is a global variable. In each iteration, the new promise is a completely different promise, and has a new shared state and any future retrieved from the previous promise is not referring to this new shared state.

So these solutions are not exactly resetting, but recycling the promise. New shared state allocation and de-allocation is happening in each iteration.

As mentioned by Managu,

A promise/future pair is designed to carry only a single value (or
exception.).

It serves as a single shot asynchronous data communication mechanism.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文