如何在等待时保持消息泵送?

发布于 2024-07-25 15:45:09 字数 621 浏览 12 评论 0原文

我有一个基于消息泵线程池架构的应用程序。 每当有一个操作可能阻塞时,它就会被实现为“回调完成/触发 evnet”操作,因此它不会停止执行线程。

虽然这种技术适用于大多数情况,但在某些情况下它会变得非常不方便并且使代码过于复杂。

我希望能够做的是,在等待时以透明的方式继续处理事件,而不将函数分解为前/后等待部分。

我该怎么做?

我想到了两个选择:

  1. 在等待时从执行函数内运行消息循环。
  2. 在等待时创建一个新的工作线程,并在恢复时终止它(以适当的方式)。

这两个选项都有其缺陷,仅举几例:

对于 1:

  • 可能会导致堆栈溢出。
  • 可能最终会陷入僵局。
  • 如果内部消息导致等待第二个事件完成,而外部事件同时完成,则外部函数在第二个事件完成之前无法继续,这种情况可能会扩大。

选项 2 最终可能会创建越来越多的线程。

当然,可能还有其他我没有想到的选择。

编辑:语言是C++,因此不能以简单(可移植?)的方式退出和进入函数。 平台是Windows(API),尽管我认为它不相关。

I have an application that is based on a message-pump thread-pool archtecture. Whenever there is an action that could block, it is implemented as "callback on complete/trigger evnet" action, so it won't stall the executing thread.

While this techniqiue is appropriate for most cases, there are situations when it becomes very inconvinient and over-complicates the code.

What I'd like to be able to do is, to keep processing events while waiting, in a transparent way, without breaking the function up into pre/post waiting parts.

How should I do this?

I had two options in mind:

  1. Run the message loop from within the executing function while waiting.
  2. Create a new working thread while waiting, and terminating it (in a proper way) when resuming.

Both options have their flaws, to name a few:

For 1:

  • Could potentially result in stack overflow.
  • Could potentially end up dead-locked.
  • If the inner message results in waiting for a second event to complete, and the outer event completes in the meanwhile, the outer function can't continue until the second event completes, and this situation can expand.

Option 2 can simply end up in creating more and more threads.

Ofcourse, there might be other options that I haven't thought of.

EDIT: Language is C++, so functions can't be stepped out of and into in an easy (portable?) manner. Platform is Windows (API), although I don't think it's relevant.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

冷情妓 2024-08-01 15:45:10

对于可移植的 C++,这是行不通的,但既然您提到您的平台是 Windows,为什么不使用 MsgWaitForMultipleObjects? 它的目的是让你完全按照你的问题所说的去做——在等待的时候继续发送消息。

For portable C++ this won't do, but since you've mentioned your platform is Windows, why not use MsgWaitForMultipleObjects? Its purpose is to let you do exactly what your question says - keep pumping messages while waiting.

找个人就嫁了吧 2024-08-01 15:45:10

编辑:您提到不想“将功能分解为前/后等待部分”。

你用什么语言开发? 如果它有延续(C# 中的yield return),那么这就提供了一种编写代码的方法,该代码看起来是过程性的,但可以轻松地暂停,直到阻塞操作进行完成回调为止。

这是一篇关于这个想法的文章: http://msdn.microsoft.com/en -us/magazine/cc546608.aspx

更新:

不幸的是,语言是 C++

这将成为一个很棒的 T 恤标语。

好的,您可能会发现将顺序代码构造为状态机很有帮助,因此它具有中断/恢复功能。

例如,您的痛苦是需要编写两个函数,一个启动函数,另一个充当完成事件的处理程序:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;
    begin_sending_string_somehow(msg, greeting_sent_okay);
}

void greeting_sent_okay()
{
    std::cout << "Greeting has been sent successfully." << std::endl;
}

您的想法是等待:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;

    waiter w;
    begin_sending_string_somehow(msg, w);
    w.wait_for_completion();

    std::cout << "Greeting has been sent successfully." << std::endl;
}

在该示例中,waiter 重载了operator(),因此它可以充当回调,并且 wait_for_completion 会以某种方式挂起,直到它看到operator() 已被调用。

我假设 begin_sending_string_somehow 的第二个参数是一个模板参数,它可以是任何不接受参数的可调用类型。

但正如你所说,这有缺点。 每当一个线程像这样等待时,您就添加了另一个潜在的死锁,并且您还消耗了整个线程及其堆栈的“资源”,这意味着必须在其他地方创建更多线程才能完成工作,这与线程池的整体观点相矛盾。

因此,请编写一个类:

class send_greeting
{
    int state_;
    std::string msg_;

public:
    send_greeting(const std::string &msg)
        : state_(0), msg_(msg) {}

    void operator()
    {
        switch (state_++)
        {
            case 0:
                std::cout << "Sending the greeting" << std::endl;
                begin_sending_string_somehow(msg, *this);
                break;

            case 1:
                std::cout << "Greeting has been sent successfully." 
                          << std::endl;
                break;
        }
    }
};

该类实现函数调用运算符 ()。 每次调用它时,它都会执行逻辑中的下一步。 (当然,作为一个如此简单的示例,现在这主要是状态管理噪音,但在具有四个或五个状态的更复杂的示例中,它可能有助于阐明代码的顺序性质)。

问题:

  • 如果事件回调函数签名具有特殊参数,则需要添加另一个 operator() 重载,将参数存储在额外字段中,然后调用无参数重载。 然后它开始变得混乱,因为这些字段将在初始状态下的编译时访问,即使它们在该状态下的运行时没有意义。

  • 如何构造和删除该类的对象? 对象必须存活直到操作完成或被放弃......这是 C++ 的主要陷阱。 我建议实施一个通用方案来管理它。 创建一个“需要删除的东西”的列表,并确保在某些安全点自动发生,即尝试尽可能接近 GC。 距离越远,泄漏的内存就越多。

EDIT: You mention not wanting to "breaking the function up into pre/post waiting parts."

What language are you developing in? If it has continuations (yield return in C#) then that provides a way to write code that appears to be procedural but which can easily be paused until a blocking operation makes its completion callback.

Here's an article about the idea: http://msdn.microsoft.com/en-us/magazine/cc546608.aspx

UPDATE:

Unfortunatly, the language is C++

That would make a great T-shirt slogan.

Okay, so you might find it helpful to structure your sequential code as a state-machine, so it becomes interrupt/resume-capable.

e.g. your pain is needing to write two functions, the one that initiates and the one that acts as the handler for the completion event:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;
    begin_sending_string_somehow(msg, greeting_sent_okay);
}

void greeting_sent_okay()
{
    std::cout << "Greeting has been sent successfully." << std::endl;
}

Your idea was to wait:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;

    waiter w;
    begin_sending_string_somehow(msg, w);
    w.wait_for_completion();

    std::cout << "Greeting has been sent successfully." << std::endl;
}

In that example, waiter overloads operator() so it can serve as a callback, and wait_for_completion somehow hangs up until it sees that the operator() has been called.

I'm assuming that begin_sending_string_somehow's second parameter is a template parameter that can be any callable type accepting no parameters.

But as you say, this has drawbacks. Any time a thread is waiting like that, you've added another potential deadlock, and you are also consuming the "resource" of a whole thread and its stack, meaning that more threads will have to be created elsewhere to allow work to be done, which is contradictory to the whole point of a thread pool.

So instead, write a class:

class send_greeting
{
    int state_;
    std::string msg_;

public:
    send_greeting(const std::string &msg)
        : state_(0), msg_(msg) {}

    void operator()
    {
        switch (state_++)
        {
            case 0:
                std::cout << "Sending the greeting" << std::endl;
                begin_sending_string_somehow(msg, *this);
                break;

            case 1:
                std::cout << "Greeting has been sent successfully." 
                          << std::endl;
                break;
        }
    }
};

The class implements the function call operator (). Each time it is called, it executes the next step in the logic. (Of course, being such a trivial example, this now is mostly state management noise, but in a more complex example with four or five states it may help clarify the sequential nature of the code).

Problems:

  • If the event callback function signature has special parameters, you'll need to add another overload of operator() that stores the parameters in extra fields and then calls onto the parameterless overload. Then it starts to get messy because those fields will be accessible at compile-time in the initial state, even though they are not meaningful at runtime in that state.

  • How do objects of the class get constructed and deleted? The object has to survive until the operation completes or is abandoned... the central pitfall of C++. I'd recommend implementing a general scheme to manage it. Create a list of "things that will need to be deleted" and ensure that this happens automatically at certain safe points, i.e. try to get as close as possible to GC as you can. The further away you are from that, the more memory you will leak.

蹲墙角沉默 2024-08-01 15:45:10

如果不了解有关您的特定应用程序的更多信息(即处理消息需要多长时间等),就会有很多犹豫:

  • 这是托管还是非托管 C++?

  • 您使用哪个线程池?

    • 队列用户工作项?
    • 您通过 CreateIoCompletionPort 拥有自己的池?
    • 还是 Vista 的 SubmitThreadpoolWork?

我认为平台有些相关,因为线程池的性质很重要。

例如:

如果您使用 (完成端口 )为您的线程池(即CreateIoCompletionPort)。 您可以控制并发运行的线程数量(从而控制最终创建的线程总数)。 如果您将最大并发线程数设置为 4。Windows 将尝试仅允许 4 个线程同时运行。 如果所有 4 个线程都忙于处理,并且您将第 5 个项目排队,那么 Windows 将不允许该项目运行,直到 4 个线程之一完成(重用线程)。 唯一违反此规则的情况是当线程被阻塞(即等待 I/O)时,然后允许更多线程运行。

这是了解完成端口以及平台为何相关的重要内容。 在不涉及内核的情况下实现这样的事情是非常困难的。 了解繁忙线程和阻塞线程之间的区别需要访问线程状态。 就进入内核的上下文切换数量而言,完成端口也非常高效。

回到您的问题:

您似乎应该有一个线程来处理/分派消息,并且消息处理都是通过将工作线程推送到线程池来处理的。 让完成端口处理负载平衡和并发。 您的消息处理循环永远不会阻塞,并且可以继续处理消息。

如果传入消息的速率远远超出您的处理能力,那么您可能需要注意队列大小,并在队列变得太大时进行阻塞。

Without knowing more about your specific application (ie how long messages take to process etc..) there will be lots of handwaving:

  • Is this managed or unmanaged C++?

  • Which ThreadPool are you using?

    • QueueUserWorkItem?
    • Your own pool via CreateIoCompletionPort?
    • Or Vista's SubmitThreadpoolWork?

I think platform is somewhat relevant as the nature of the Thread Pool is important.

For example:

If you use (Completion Ports) for your thread pool (ie CreateIoCompletionPort). You have some control on how many threads run concurrently (and hence on how many total threads are ultimately created). If you set the maximum number of concurrent threads to say 4. Windows will will attempt to only allow 4 threads to run concurrently. If all 4 threads are busy processing and you queue a 5th item then windows will not allow that item to run until one of the 4 if finished (reusing the thread). The only time this rule is broken is when threads are blocked (ie waiting on I/O), then more threads are allowed to run.

This is the important thing to understand about Completion Ports, and why platform is relevant. It is very difficult to implement something like this without involving the Kernel. Knowing the difference between busy threads and blocked threads requires access to Thread states. Completion ports are very efficient with respect to number of context switches into the Kernel too.

Back to your question:

It would seem that you should have one thread to process/dispatch the messages and the message processing is all handled by pushing workers onto a thread pool. Let Completion ports handle the load balancing and concurrency. You message processing loop will never block and can continue to process messages.

If the rate of incoming messages far exceeds your ability to process them then you will probably have to pay attention to your queue size and block when it grows too large.

诗酒趁年少 2024-08-01 15:45:10

看来你的问题是根本性的,与 C++ 无关。 其他语言在隐藏堆栈使用方面可能更好,但只要您没有从 Foo() 返回,您就需要 Foo() 的调用堆栈。 如果您还执行 Bar(),那么它也需要一个调用堆栈。

线程是一个很好的方法,因为每个线程都有自己的调用堆栈。 延续是保存调用堆栈的一种聪明但复杂的方法,因此在可用的情况下,这也是一种选择。 但如果您不想要这些,则必须使用一个调用堆栈。

使用一个调用堆栈需要解决重入问题。 在这里,对于什么是可能的,没有通用的答案。 一般来说,您将拥有一组消息 M1..Mx,它们由函数 F1...Fy 处理,并具有一些特定于应用程序和可能与状态相关的映射。 对于可重入消息循环,您可能在收到 Mj 时正在执行 Fi。 现在的问题是该怎么办。 并非所有函数 F1...Fn 都可以调用; 特别是 Fi 本身可能不可调用。 然而,一些其他功能也可能不可用,例如因为它们共享资源。 这取决于应用程序。

如果 Mj 的处理需要任何这些不可用的函数,则必须推迟它。 您可以接受队列中的下一条消息吗? 同样,这取决于实现,甚至可能与消息类型和内容有关。 如果消息足够独立,则可以乱序执行它们。 这很快就会变得相当复杂 - 为了确定是否可以接受队列中的第 N 条消息,您必须检查它是否可以相对于前面的 N-1 条消息乱序执行。

语言可以通过不隐藏依赖关系来帮助您,但最终您必须做出明确的决定。 没有灵丹妙药。

It seems your problem is fundamental, and not related to C++. Other languages are perhaps better in hiding the stack usage, but as long as you haven't returned from Foo() you need the call stack for Foo(). And if you're also executing Bar(), that too needs a callstack.

Threads are an excellent approach to this, as each thread comes with its own callstack. Continuations are a smart but complicated way to save callstacks, so where available those are an option too. But if you don't want those, you'll have to make do with one callstack.

Daling with one callstack requires addressing reentrancy. Here, there's no generic answer on what's possible. In general, you will have a set of Messages M1..Mx which are handled by functions F1...Fy, with some application-specific and possibly state-dependent mapping. With a reentrant message loop, you might be executing Fi when you receive Mj. Now the problem is what to do. Not all functions F1...Fn may be callable; in particular Fi itself may not be callable. However, some other functions may also be unavailable, e.g. because they share resources. This is application-dependent.

If the processing of Mj requires any of these unavailable functions, you have to postpone it. Can you accept the next message in the queue? Again, that's implemenation dependent, and it may even relate to the message type and content. If the messages are sufficiently independent, it's possible to execute them out of order. This quickly becomes rather complex - to determine whether it's possible to accept the Nth message in the queue, you have to check if it can be executed out-of-order with respect to the preceding N-1 messages.

A language can help you by not hiding dependencies, but in the end you must make the explicit decisions. There's no silver bullet.

猫烠⑼条掵仅有一顆心 2024-08-01 15:45:10

您的问题是同步线程对吧? 如果这是您的问题,为什么不使用互斥锁呢? 它可以用一个接口包裹起来。 事实上,您可以使用 PIMPL 习惯用法使互斥体可移植。

http://msdn.microsoft.com /en-us/library/system.threading.mutex(VS.71).aspx

Your problem is synchronizing the threads right? If that is your problem, why not use a mutex? It could be wrapped up with an interface. In fact you could use the PIMPL idiom to make the mutex portable.

http://msdn.microsoft.com/en-us/library/system.threading.mutex(VS.71).aspx

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文