Python 中的快速方法调用调度

发布于 2024-09-04 12:51:12 字数 891 浏览 5 评论 0原文

对于我的项目的某些部分,我需要一个进程本地调度系统,它允许我将方法执行延迟几秒钟。我这个系统有数千个“客户端”,因此每次延迟都使用 threading.Timer 是一个坏主意,因为我很快就会达到操作系统线程限制。我实现了一个仅使用一个线程进行时序控制的系统。

主要思想是保留排序的任务(time + func + args + kwargs)队列并使用单个 threading.Timer 来调度/取消该队列头部的执行。这个方案有效,但我对性能不满意。大约 2000 个客户端每大约 10 秒调度一次虚拟任务,导致该进程占用 40% 的 CPU 时间。查看探查器输出,我发现所有时间都花费在 new threading.Timer 的构造、启动上,特别是新线程的创建上。

我相信有更好的方法。现在我考虑重写LightTimer,以便有一个由threading.Event控制的执行线程和多个set()的定时线程事件。例如:

  • 我安排一个任务在 10 秒内调用。任务被添加到队列中。计时线程 #1 在 event.set() 之前启动 time.sleep(10)
  • 然后我安排一个任务在 11 秒内调用。任务被添加到队列中。定时线程什么也没有发生,它会在唤醒后注意到新的任务。
  • 然后我安排一个任务在 5 秒内调用。该任务被添加到队列中。计时线程 #2 启动 time.sleep(5) 因为 #1 已经睡眠了更长的时间间隔。

我希望你已经明白了这个想法。对于这种方式,你怎么看?有更好的办法吗?也许我可以利用一些linux系统特性来做出最佳解决方案?

For some part of my project I need a process-local scheduling system that will allow me to delay method execution on few seconds. I have thousands of “clients” of this system, so using threading.Timer for each delay is a bad idea because I will quickly reach OS thread limit. I've implemented a system that use only one thread for timing control.

The main idea is to keep sorted task (time + func + args + kwargs) queue and to use single threading.Timer to schedule/cancel executions of the head of this queue. This scheme works, but I'm not happy with performance. ~2000 clients that schedule dummy tasks every ~10 seconds cause the process to take 40% of CPU time. Looking at profiler output I see that all time is spent on new threading.Timers construction, its start and particularly on new threads creation.

I believe there is a better way. Now I think about rewriting the LightTimer so that there will be one execution thread controllable by threading.Event and several timing threads that will set() the event. For example:

  • I schedule a task to call in 10 secs. The task is added to a queue. Timing thread #1 starts time.sleep(10) before event.set()
  • Then I schedule a task to call in 11 secs. The task is added to the queue. Nothing happens with timing thread, it will notice new task after wake up.
  • Then I schedule a task to call in 5 secs. The task is prepended to the queue. Timing thread #2 starts time.sleep(5) because #1 sleeps already for a longer interval.

I hope you've caught the idea. What do you think about this way? Is there a better way? Maybe I can utilize some linux system features to make optimal solution?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

日久见人心 2024-09-11 12:51:12

您可以使用的另一种实现是使用 time.time() 方法来计算每个排队函数应执行的绝对时间。将此时间和要调用的函数放入对象包装器中,该包装器使用执行时间来覆盖比较运算符以确定顺序。然后使用 heapq 模块来维护最小堆。这将为您提供一个高效的数据结构,其中堆的元素 0 始终是您的下一个事件。

实现实际调用的一种方法是使用单独的线程来执行回调。堆需要使用互斥体进行保护,并且您可以使用条件变量来实现调度。在无限循环中,只需查找下一次执行函数(堆的元素 0)并使用条件变量的 wait() 方法,并将超时设置为下一次执行时间。如果新插入的函数应该在堆中最早的函数之前发生,那么您的堆插入方法可以使用条件变量的 notify() 方法提前唤醒调度线程。

An alternative implementation you could use is to use the time.time() method to calculate the absolute time each queued function should be executed. Place this time and your function-to-be-called in an object wrapper that overrides the comparison operator using the execution time to determine order. Then use the heapq module to maintain a min-heap. This will provide you with an efficient datastructure where element 0 of the heap is always your next event.

One way to implement the actual calls would be to use a separate thread to execute the callbacks. The heap will need to be protected with a mutex and you can use a condition variable to implement the scheduling. In an infinite loop, just lookup the next time to execute a function (element 0 of the heap) and use the condition variable's wait() method with the timeout set to the next execution time. Your heap insertion method could then use the condition variable's notify() method to wake the scheduling thread early if the newly inserted function should occur prior to the earliest one already in the heap.

轮廓§ 2024-09-11 12:51:12

你看过Python中的 sched 模块吗标准库?在专用线程上运行调度程序(并且让所有计划的操作“将绑定方法及其参数放在队列上”,池中的线程从中剥离并执行它——就像我在有关线程的 Nutshell 章节中所写的那样,除非在那种情况下没有安排)应该做你想做的事。

Have you looked at the sched module in the Python standard library? Running the scheduler on a dedicated thread (and having all the scheduled actions be "put a bound method and its args on a queue" from which threads in a pool peel and execute it -- much as I wrote in the Nutshell chapter on threads, except that in that case there was no scheduling) should do what you want.

旧情别恋 2024-09-11 12:51:12

“几千个客户端”不太可能达到操作系统线程限制;不过,所有这些线程的堆栈可能会消耗大量不必要的内存。

看看twisted 的作用,它允许一个进程以一种已被证明可以很好地处理大量事件的方式复用大量事件(包括计时器)。

您还可以结合事件驱动和多进程模型,通过在每台机器上运行多个进程并在每个进程中执行事件驱动逻辑 - 假设一个进程可以处理 2,000 个客户端,您仍然可以运行 30 个进程(前提是有足够的总体资源) )并获得更好的吞吐量,尤其是在现代多核硬件上。

You are unlikely to reach the OS thread limit with "a few thousand clients"; you may consume a lot of unnecessary memory with the stacks for all those threads though.

Have a look at what twisted does, it allows a process to multiplex a lot of events (including timers) in a way which has proven to work quite well with large numbers of events.

You can also combine event-driven and multi-process models, by running several processes per machine and doing event-driven logic in each one - say one process can handle 2,000 clients, you can still run 30x processes (provided there is sufficient overall resource) and gain better throughput, especially on modern multi-core hardware.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文