我想等待文件描述符和互斥体,推荐的方法是什么?

发布于 2024-10-30 01:31:42 字数 710 浏览 6 评论 0原文

我想生成线程来执行某些任务,并使用线程安全队列与它们进行通信。我还想在等待时对各种文件描述符进行 IO。

完成此任务的推荐方法是什么?当队列从无元素变为某些元素时,我是否必须创建一个线程间管道并写入它?难道就没有更好的办法吗?

如果我必须创建线程间管道,为什么没有更多实现共享队列的库允许您将共享队列和线程间管道创建为单个实体?

我想这样做是否意味着存在根本的设计缺陷?

我问的是关于 C++ 和 Python 的问题。我对跨平台解决方案有点感兴趣,但主要对 Linux 感兴趣。

对于更具体的例子......

我有一些代码将在文件系统树中搜索内容。我有几个通过套接字向外界开放的通信通道。可能(或可能不会)导致需要在文件系统树中搜索内容的请求将会到达。

我将在一个或多个线程中隔离在文件系统树中搜索内容的代码。我想接受导致需要搜索树的请求,并将它们放入由搜索器线程完成的线程安全队列中。结果将被放入已完成搜索的队列中。

我希望能够在搜索进行时快速满足所有非搜索请求。我希望能够及时对搜索结果采取行动。

为传入请求提供服务通常意味着某种使用epoll的事件驱动架构。磁盘搜索请求队列和结果返回队列意味着使用互斥体或信号量来实现线程安全的线程安全队列。

等待空队列的标准方法是使用条件变量。但如果我在等待时需要满足其他请求,那么这将不起作用。要么我最终一直轮询结果队列(平均将结果延迟轮询间隔的一半),阻塞并且不服务请求。

I would like to spawn off threads to perform certain tasks, and use a thread-safe queue to communicate with them. I would also like to be doing IO to a variety of file descriptors while I'm waiting.

What's the recommended way to accomplish this? Do I have to created an inter-thread pipe and write to it when the queue goes from no elements to some elements? Isn't there a better way?

And if I have to create the inter-thread pipe, why don't more libraries that implement shared queues allow you to create the shared queue and inter-thread pipe as a single entity?

Does the fact I want to do this at all imply a fundamental design flaw?

I'm asking this about both C++ and Python. And I'm mildly interested in a cross-platform solution, but primarily interested in Linux.

For a more concrete example...

I have some code which will be searching for stuff in a filesystem tree. I have several communications channels open to the outside world through sockets. Requests that may (or may not) result in a need to search for stuff in the filesystem tree will be arriving.

I'm going to isolate the code that searches for stuff in the filesystem tree in one or more threads. I would like to take requests that result in a need to search the tree and put them in a thread-safe queue of things to be done by the searcher threads. The results will be put into a queue of completed searches.

I would like to be able to service all the non-search requests quickly while the searches are going on. I would like to be able to act on the search results in a timely fashion.

Servicing the incoming requests would generally imply some kind of event-driven architecture that uses epoll. The queue of disk-search requests and the return queue of results would imply a thread-safe queue that uses mutexes or semaphores to implement the thread safety.

The standard way to wait on an empty queue is to use a condition variable. But that won't work if I need to service other requests while I'm waiting. Either I end up polling the results queue all the time (and delaying the results by half the poll interval, on average), blocking and not servicing requests.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

儭儭莪哋寶赑 2024-11-06 01:31:42

每当使用事件驱动架构时,都需要有一种机制来报告事件完成情况。在 Linux 上,如果使用文件,则需要使用 select 或 poll 系列中的某些内容,这意味着必须使用管道来启动所有与文件无关的事件。

编辑:Linux有eventfdtimerfd。这些可以添加到您的 epoll 列表中,并分别在从另一个线程触发或在计时器事件上触发时用于中断 epoll_wait

还有另一种选择,那就是信号。可以使用 fcntl 修改文件描述符,以便在文件描述符变为活动状态时发出信号。然后,信号处理程序可以将文件就绪消息推送到您选择的任何类型的队列中。这可能是一个简单的信号量或互斥/条件变量驱动的队列。由于现在不再使用select/poll,因此不再需要使用管道来对非基于文件的消息进行排队。

健康警告:我还没有尝试过这个,虽然我不明白为什么它不起作用,但我真的不知道 signal 方法的性能影响。

编辑:在信号处理程序中操作互斥锁可能是一个非常糟糕的主意。

Whenever one uses an event driven architecture, one is required to have a single mechanism to report event completion. On Linux, if one is using files, one is required to use something from the select or poll family meaning that one is stuck with using a pipe to initiate all none file related events.

Edit: Linux has eventfd and timerfd. These can be added to your epoll list and used to break out of the epoll_wait when either triggered from another thread or on a timer event respectively.

There is another option and that is signals. One can use fcntl modify the file descriptor such that a signal is emitted when the file descriptor becomes active. The signal handler may then push a file-ready message onto any type of queue of your choosing. This may be a simple semaphore or mutex/condvar driven queue. Since one is now no longer using select/poll, one no longer needs to use a pipe to queue none file based messages.

Health warning: I have not tried this and although I cannot see why it will not work, I don't really know the performance implications of the signal approach.

Edit: Manipulating a mutex in a signal handler is probably a very bad idea.

兮子 2024-11-06 01:31:42

我已经使用你提到的 pipeline() 和 libevent (它包装了 epoll)解决了这个问题。当工作线程的输出队列从空变为非空时,工作线程将一个字节写入其管道FD。这会唤醒主 IO 线程,然后主线程可以获取工作线程的输出。这很好用,实际上编码非常简单。

I've solved this exact problem using what you mention, pipe() and libevent (which wraps epoll). The worker thread writes a byte to its pipe FD when its output queue goes from empty to non-empty. That wakes up the main IO thread, which can then grab the worker thread's output. This works great is actually very simple to code.

计㈡愣 2024-11-06 01:31:42

你有 Linux 标签,所以我要扔掉这个: POSIX 消息队列 做所有这应该满足您的“内置”请求,如果不是您不太想要的跨平台愿望。

线程安全同步是内置的。您可以让工作线程在读取队列时阻塞。或者,当队列中放入新项目时,MQ 可以使用 mq_notify() 生成一个新线程(或向现有线程发出信号)。由于看起来您将使用 select(),因此 MQ 的标识符 (mqd_t) 可以用作 select 的文件描述符。

You have the Linux tag so I am going to throw this out: POSIX Message Queues do all this, which should fulfill your "built-in" request if not your less desired cross-platform wish.

The thread-safe synchronization is built-in. You can have your worker threads block on read of the queue. Alternatively MQs can use mq_notify() to spawn a new thread (or signal an existing one) when there is a new item put in the queue. And since it looks like you are going to be using select(), MQ's identifier (mqd_t) can be used as a file descriptor with select.

挽你眉间 2024-11-06 01:31:42

似乎还没有人提到这个选项:

不要运行 select/poll/etc。在你的“主线程”中。启动一个专用的辅助线程,该线程执行 I/O 操作,并在 I/O 操作完成时将通知推送到线程安全队列(其他线程用于与主线程通信的队列相同)。

然后你的主线程只需要等待通知队列。

It seems nobody has mentioned this option yet:

Don't run select/poll/etc. in your "main thread". Start a dedicated secondary thread which does the I/O and pushes notifications into your thread-safe queue (the same queue which your other threads use to communicate with the main thread) when I/O operations complete.

Then your main thread just needs to wait on the notification queue.

草莓味的萝莉 2024-11-06 01:31:42

在我看来,Duck 和 twk 的答案实际上比 doron 的(OP 选择的答案)更好。 doron 建议从信号处理程序的上下文中写入消息队列,并指出消息队列可以是“任何类型的队列”。我强烈警告您不要这样做,因为许多 C 库/系统调用无法从信号处理程序中安全地调用(请参阅 异步信号安全)。

特别是,如果您选择受互斥锁保护的队列,则不应从信号处理程序访问它。考虑这种情况:您的消费者线程锁定队列来读取它。之后,内核立即发出信号来通知您文件描述符现在有数据。您信号处理程序在消费者线程中运行(必然),并尝试将某些内容放入队列中。为此,它首先必须获取锁。但它已经持有锁了,所以你现在陷入了僵局。

根据我的经验,select/poll 是 UNIX/Linux 中事件驱动程序的唯一可行的解​​决方案。我希望多线程程序中有更好的方法,但是您需要某种机制来“唤醒”您的消费者线程。我还没有找到一种不涉及系统调用的方法(因为在任何阻塞调用(例如 select)期间,消费者线程位于内核内的等待队列上)。

编辑:我忘了提及一种使用 select/poll 时处理信号的特定于 Linux 的方法:signalfd(2)。您获得一个可以选择/轮询的文件描述符,并且处理代码正常运行,而不是在信号处理程序的上下文中运行。

Duck's and twk's are actually better answers than doron's (the one selected by the OP), in my opinion. doron suggests writing to a message queue from within the context of a signal handler, and states that the message queue can be "any type of queue." I would strongly caution you against this since many C library/system calls cannot safely be called from within a signal handler (see async-signal-safe).

In particuliar, if you choose a queue protected by a mutex, you should not access it from a signal handler. Consider this scenario: your consumer thread locks the queue to read it. Immediately after, the kernel delivers the signal to notify you that a file descriptor now has data on it. You signal handler runs in the consumer thread, necessarily), and tries to put something on your queue. To do this, it first has to take the lock. But it already holds the lock, so you are now deadlocked.

select/poll is, in my experience, the only viable solution to an event-driven program in UNIX/Linux. I wish there were a better way inside a mutlithreaded program, but you need some mechanism to "wake up" your consumer thread. I have yet to find a method that does not involve a system call (since the consumer thread is on a waitqueue inside the kernel during any blocking call such as select).

EDIT: I forgot to mention one Linux-specific way to handle signals when using select/poll: signalfd(2). You get a file descriptor you can select/poll on, and you handling code runs normally instead of in a signal handler's context.

守望孤独 2024-11-06 01:31:42

这是一个非常常见的问题,尤其是当您开发网络服务器端程序时。大多数Linux服务器端程序的主要外观将像这样循环:

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        resp = proc(req);
        fd.send(resp);
    }
}

它是单线程(主线程)、基于epoll的服务器框架。问题是,它是单线程的,而不是多线程的。它要求 proc() 永远不应阻塞或运行很长时间(例如,常见情况为 10 毫秒)。

如果 proc() 将运行很长时间,我们需要多线程,并在单独的线程(工作线程)中执行 proc()。

我们可以在不阻塞主线程的情况下将任务提交给工作线程,使用基于互斥的消息队列,它足够快。

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

那么我们需要一种方法来从工作线程获取任务结果。如何?如果我们只是直接检查消息队列,在epoll_wait()之前或之后。

epoll_add(serv_sock);
while(1){
    ret = epoll_wait(); // may blocks for 10ms
    resp = queue.check_result(); // fast, non blockable
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

然而,检查动作将在 epoll_wait() 结束后执行,如果 epoll_wait() 等待的所有文件描述符都不是活动的,则通常会阻塞 10 微秒(常见情况)。

对于服务器来说,10毫秒是相当长的时间了!我们可以在任务结果生成时通知 epoll_wait() 立即结束吗?

是的!我将在我的一个开源项目中描述它是如何完成的:

为所有工作线程创建一个管道,并且 epoll 也在该管道上等待。一旦任务结果产生,工作线程向管道写入一个字节,然后 epoll_wait() 几乎同时结束! - Linux 管道有 5 us 到 20 us 的延迟。


在我的项目SSDB(一个Redis协议兼容的盘内NoSQL数据库)中,我创建了一个SelectableQueue用于传递主线程和工作线程之间的消息。正如它的名字一样,SelectableQueue 有一个文件描述符,可以通过 epoll 等待。

SelectableQueue: https://github.com/ideawu/ ssdb/blob/master/src/util/thread.h#L94

主线程中的用法:

epoll_add(serv_sock);
epoll_add(queue->fd());
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        if(fd is queue){
            sock, resp = queue->pop_result();
            sock.send(resp);
        }
        if(fd is client_socket){
            req = fd.read();
            queue->add_task(fd, req);
        }
    }
}

工作线程中的用法:

fd, req = queue->pop_task();
resp = proc(req);
queue->add_result(fd, resp);

This is a very common seen problem, especially when you are developing network server-side program. Most Linux server-side program's main look will loop like this:

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        resp = proc(req);
        fd.send(resp);
    }
}

It is single threaded(the main thread), epoll based server framework. The problem is, it is single threaded, not multi-threaded. It requires that proc() should never blocks or runs for a significant time(say 10 ms for common cases).

If proc() will ever runs for a long time, WE NEED MULTI THREADS, and executes proc() in a separated thread(the worker thread).

We can submit task to the worker thread without blocking the main thread, using a mutex based message queue, it is fast enough.

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

Then we need a way to obtain the task result from a worker thread. How? If we just check the message queue directly, before or after epoll_wait().

epoll_add(serv_sock);
while(1){
    ret = epoll_wait(); // may blocks for 10ms
    resp = queue.check_result(); // fast, non blockable
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

However, the checking action will execute after epoll_wait() to end, and epoll_wait() usually blocks for 10 micro seconds(common cases) if all file descriptors it waits are not active.

For a server, 10 ms is quite a long time! Can we signal epoll_wait() to end immediately when task result is generated?

Yes! I will describe how it is done in one of my open source project:

Create a pipe for all worker threads, and epoll waits on that pipe as well. Once a task result is generated, the worker thread writes one byte into the pipe, then epoll_wait() will end in nearly the same time! - Linux pipe has 5 us to 20 us latency.


In my project SSDB(a Redis protocol compatible in-disk NoSQL database), I create a SelectableQueue for passing messages between the main thread and worker threads. Just like its name, SelectableQueue has an file descriptor, which can be wait by epoll.

SelectableQueue: https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

Usage in main thread:

epoll_add(serv_sock);
epoll_add(queue->fd());
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        if(fd is queue){
            sock, resp = queue->pop_result();
            sock.send(resp);
        }
        if(fd is client_socket){
            req = fd.read();
            queue->add_task(fd, req);
        }
    }
}

Usage in worker thread:

fd, req = queue->pop_task();
resp = proc(req);
queue->add_result(fd, resp);
逐鹿 2024-11-06 01:31:42

C++11 有 std::mutex 和 std::condition_variable。当满足某一条件时,这两个线程可用于让一个线程向另一个线程发出信号。在我看来,您需要根据这些原语构建您的解决方案。如果您的环境尚不支持这些 C++11 库功能,您可以在 boost 中找到非常相似的功能。抱歉,关于 python 不能说太多。

C++11 has std::mutex and std::condition_variable. The two can be used to have one thread signal another when a certain condition is met. It sounds to me like you will need to build your solution out of these primitives. If you environment does not yet support these C++11 library features, you can find very similar ones at boost. Sorry, can't say much about python.

微凉 2024-11-06 01:31:42

完成您想要做的事情的一种方法是实现 观察者模式

您将注册您的主线程作为所有生成线程的观察者,并让它们在完成应做的事情时通知它(或在运行期间使用您需要的信息进行更新)。

基本上,您想要改变事件驱动模型的方法。

One way to accomplish what you're looking to do is by implementing the Observer Pattern

You would register your main thread as an observer with all your spawned threads, and have them notify it when they were done doing what they were supposed to (or updating during their run with the info you need).

Basically, you want to change your approach to an event-driven model.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文