如何“稍后”高效地完成许多任务?在Python中?

发布于 2024-11-24 16:10:27 字数 587 浏览 1 评论 0 原文

我有一个流程,需要“稍后”(通常在 10-60 秒后)执行一系列操作。问题是这些“稍后”的操作可能会很多(1000 秒),因此每个任务使用一个Thread 是不可行的。我知道诸如 geventeventlet,但问题之一是该进程使用了​​zeromq 为了沟通,所以我需要一些整合(eventlet 已经有了)。

我想知道的是我的选择是什么?因此,欢迎提出建议,包括库(如果您使用过任何提到的请分享您的经验)、技术(Python 的“协程”支持,使用一个休眠一段时间并检查队列的线程) ,如何利用Zeromq 的 poll 或 eventloop 来完成这项工作,或者其他东西。

I have a process, that needs to perform a bunch of actions "later" (after 10-60 seconds usually). The problem is that those "later" actions can be a lot (1000s), so using a Thread per task is not viable. I know for the existence of tools like gevent and eventlet, but one of the problem is that the process uses zeromq for communication so I would need some integration (eventlet already has it).

What I'm wondering is What are my options? So, suggestions are welcome, in the lines of libraries (if you've used any of the mentioned please share your experiences), techniques (Python's "coroutine" support, use one thread that sleeps for a while and checks a queue), how to make use of zeromq's poll or eventloop to do the job, or something else.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

故乡的云 2024-12-01 16:10:27

考虑使用带有一个或多个工作线程的优先级队列来为任务提供服务。主线程可以将工作添加到队列中,并带有应该提供服务的最快时间戳。工作线程从队列中弹出工作,休眠直到达到优先级值的时间,完成工作,然后从队列中弹出另一个项目。

更充实的答案怎么样?麦克劳伯提出了一个很好的观点。如果当您有新的、更紧急的工作时,所有工作人员可能都在睡觉,那么 queue.PriorityQueue 并不是真正的解决方案,尽管“优先级队列”仍然是解决问题的方法。 use,可从 heapq 模块获取。相反,我们将使用不同的同步原语;条件变量,在 Python 中拼写为 threading.Condition

该方法相当简单,查看堆,如果工作是当前的,则将其弹出并执行该工作。如果有工作,但安排在未来,就等待条件直到那时,或者如果根本没有工作,就永远睡觉。

制片人承担了应得的工作;每次添加新工作时,它都会通知该情况,因此如果有正在睡觉的工作人员,他们会醒来并重新检查队列中是否有新工作。

import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)

consider using a priority queue with one or more worker threads to service the tasks. The main thread can add work to the queue, with a timestamp of the soonest it should be serviced. Worker threads pop work off the queue, sleep until the time of priority value is reached, do the work, and then pop another item off the queue.

How about a more fleshed out answer. mklauber makes a good point. If there's a chance all of your workers might be sleeping when you have new, more urgent work, then a queue.PriorityQueue isn't really the solution, although a "priority queue" is still the technique to use, which is available from the heapq module. Instead, we'll make use of a different synchronization primitive; a condition variable, which in python is spelled threading.Condition.

The approach is fairly simple, peek on the heap, and if the work is current, pop it off and do that work. If there was work, but it's scheduled into the future, just wait on the condition until then, or if there's no work at all, sleep forever.

The producer does it's fair share of the work; every time it adds new work, it notifies the condition, so if there are sleeping workers, they'll wake up and recheck the queue for newer work.

import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)
瑕疵 2024-12-01 16:10:27

这个答案实际上有两个建议——我的第一个和我在第一个之后发现的另一个。

sched

我怀疑您正在寻找 sched 模块

编辑:我读完后,我的建议似乎没什么帮助。因此,我决定测试 sched 模块,看看它是否可以按照我的建议工作。我的测试来了:我将它与单个线程一起使用,或多或少是这样的:

class SchedulingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

    def run(self):
        self.scheduler.run()

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

首先,我创建一个线程类,它有自己的调度程序和队列。至少一个事件将在调度程序中注册:一个事件用于调用从队列调度事件的方法。

class SchedulingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

从队列中调度事件的方法将锁定队列,调度每个事件,清空队列并再次调度自身,以便在将来某个时间查找新事件。注意,寻找新事件的周期很短(一秒),你可以改变它:

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

该类还应该有一个调度用户事件的方法。当然,这个方法应该在更新队列时锁定队列:

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

最后,该类应该调用调度程序主方法:

    def run(self):
        self.scheduler.run()

这是一个使用示例:

def print_time():
    print "scheduled:", time.time()


if __name__ == "__main__":
    st = SchedulingThread()
    st.start()          
    st.schedule(print_time, 10)

    while True:
        print "main thread:", time.time()
        time.sleep(5)

    st.join()

它在我的机器中的输出是:

$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77

这段代码只是一个快速的示例,它可能需要一些工作。然而,我必须承认我对 sched 模块有点着迷,所以我建议了它。您可能还想寻找其他建议:)

APScheduler

在 Google 中查找类似我发布的解决方案,我发现了这个 令人惊叹的 APScheduler 模块。它是如此实用和有用,我敢打赌它就是您的解决方案。我之前的示例使用此模块会更简单:(

from apscheduler.scheduler import Scheduler
import time

sch = Scheduler()
sch.start()

@sch.interval_schedule(seconds=10)

def print_time():
    print "scheduled:", time.time()
    sch.unschedule_func(print_time)

while True:
    print "main thread:", time.time()
    time.sleep(5)

不幸的是,我没有找到如何安排事件仅执行一次,因此函数事件应该自行取消安排。我打赌可以通过一些装饰器来解决。)

This answer has actually two suggestions - my first one and another I have discovered after the first one.

sched

I suspect you are looking for the sched module.

EDIT: my bare suggestion seemed little helpful after I have read it. So I decided to test the sched module to see if it can work as I suggested. Here comes my test: I would use it with a sole thread, more or less this way:

class SchedulingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

    def run(self):
        self.scheduler.run()

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

First, I'd create a thread class which would have its own scheduler and a queue. At least one event would be registered in the scheduler: one for invoking a method for scheduling events from the queue.

class SchedulingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

The method for scheduling events from the queue would lock the queue, schedule each event, empty the queue and schedule itself again, for looking for new events some time in the future. Note that the period for looking for new events is short (one second), you may change it:

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

The class should also have a method for scheduling user events. Naturally, this method should lock the queue while updating it:

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

Finally, the class should invoke the scheduler main method:

    def run(self):
        self.scheduler.run()

Here comes an example of using:

def print_time():
    print "scheduled:", time.time()


if __name__ == "__main__":
    st = SchedulingThread()
    st.start()          
    st.schedule(print_time, 10)

    while True:
        print "main thread:", time.time()
        time.sleep(5)

    st.join()

Its output in my machine is:

$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77

This code is just a quick'n'dirty example, it may need some work. However, I have to confess that I am a bit fascinated by the sched module, so did I suggest it. You may want to look for other suggestions as well :)

APScheduler

Looking in Google for solutions like the one I've post, I found this amazing APScheduler module. It is so practical and useful that I bet it is your solution. My previous example would be way simpler with this module:

from apscheduler.scheduler import Scheduler
import time

sch = Scheduler()
sch.start()

@sch.interval_schedule(seconds=10)

def print_time():
    print "scheduled:", time.time()
    sch.unschedule_func(print_time)

while True:
    print "main thread:", time.time()
    time.sleep(5)

(Unfortunately I did not find how to schedule an event to execute only once, so the function event should unschedule itself. I bet it can be solved with some decorator.)

橘寄 2024-12-01 16:10:27

如果您有一堆任务需要稍后执行,并且您希望即使您关闭调用程序或您的工作人员,它们也能持续存在,那么您确实应该研究 Celery,这使得创建新任务变得非常容易,让它们在您想要的任何机器上执行,然后等待结果。

在 Celery 页面中,“这是一个添加两个数字的简单任务:”

from celery.task import task

@task
def add(x, y):
    return x + y

您可以在后台执行该任务,或者等待它完成:

>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16

If you have a bunch of tasks that need to get performed later, and you want them to persist even if you shut down the calling program or your workers, you should really look into Celery, which makes it super easy to create new tasks, have them executed on any machine you'd like, and wait for the results.

From the Celery page, "This is a simple task adding two numbers:"

from celery.task import task

@task
def add(x, y):
    return x + y

You can execute the task in the background, or wait for it to finish:

>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16
○闲身 2024-12-01 16:10:27

你写道:

问题之一是该进程使用 Zeromq 进行通信,因此我需要一些集成(eventlet 已经有了)

似乎你的选择将受到这些细节的严重影响,这些细节有点不清楚 - Zeromq 如何用于通信,集成需要多少资源,您的要求和可用资源是什么。


有一个名为 django-ztask 的项目,它使用 zeromq< /code> 并提供了一个类似于 celery 的 task 装饰器。但是,它(显然)是 Django 特定的,因此可能不适合您的情况。我没用过,我自己更喜欢 celery

一直在几个项目中使用 celery(这些项目托管在 ep.io PaaS 托管上,它提供了一种简单的使用方法)。

Celery 看起来是非常灵活的解决方案,允许延迟任务、回调、任务过期和延迟任务。重试、限制任务执行率等。它可以与 Redis、Beanstalk、CouchDB、MongoDB 或 SQL 数据库一起使用。

示例代码(延迟后任务和异步执行的定义):

from celery.decorators import task

@task
def my_task(arg1, arg2):
    pass # Do something

result = my_task.apply_async(
    args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
    countdown=3, # Time in seconds to wait before queueing the task.
)

另请参阅 celery 文档中的一个部分

You wrote:

one of the problem is that the process uses zeromq for communication so I would need some integration (eventlet already has it)

Seems like your choice will be heavily influenced by these details, which are a bit unclear—how is zeromq being used for communication, how much resources will the integration will require, and what are your requirements and available resources.


There's a project called django-ztask which uses zeromq and provides a task decorator similar to celery's one. However, it is (obviously) Django-specific and so may not be suitable in your case. I haven't used it, prefer celery myself.

Been using celery for a couple of projects (these are hosted at ep.io PaaS hosting, which provides an easy way to use it).

Celery looks like quite flexible solution, allowing delaying tasks, callbacks, task expiration & retrying, limiting task execution rate, etc. It may be used with Redis, Beanstalk, CouchDB, MongoDB or an SQL database.

Example code (definition of task and asynchronous execution after a delay):

from celery.decorators import task

@task
def my_task(arg1, arg2):
    pass # Do something

result = my_task.apply_async(
    args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
    countdown=3, # Time in seconds to wait before queueing the task.
)

See also a section in celery docs.

空心空情空意 2024-12-01 16:10:27

您看过多处理模块吗?它是 Python 的标准配置。它类似于threading模块,但在一个进程中运行每个任务。您可以使用 Pool() 对象设置工作池,然后使用 .map() 方法使用各种排队任务参数调用函数。

Have you looked at the multiprocessing module? It comes standard with Python. It is similar to the threading module, but runs each task in a process. You can use a Pool() object to set up a worker pool, then use the .map() method to call a function with the various queued task arguments.

一口甜 2024-12-01 16:10:27

Pyzmq 有一个 ioloop 实现,其 api 与tornado 类似ioloop。它实现了 DelayedCallback 这可能对你有帮助。

Pyzmq has an ioloop implementation with a similar api to that of the tornado ioloop. It implements a DelayedCallback which may help you.

等数载,海棠开 2024-12-01 16:10:27

假设您的进程有一个可以接收信号的运行循环,并且每个操作的时间长度在顺序操作的范围内,请使用信号和 posix Alarm()

    signal.alarm(time)
If time is non-zero, this function requests that a 
SIGALRM signal be sent to the process in time seconds. 

这取决于您的意思“那些“稍后”操作可以很多”并且如果您的进程已经使用信号。由于问题的措辞,尚不清楚为什么需要外部 python 包。

Presuming your process has a run loop which can receive signals and the length of time of each action is within bounds of sequential operation, use signals and posix alarm()

    signal.alarm(time)
If time is non-zero, this function requests that a 
SIGALRM signal be sent to the process in time seconds. 

This depends on what you mean by "those "later" actions can be a lot" and if your process already uses signals. Due to phrasing of the question it's unclear why an external python package would be needed.

凯凯我们等你回来 2024-12-01 16:10:27

另一种选择是使用 Phyton GLib 绑定​​,特别是其 超时函数。

只要您不想使用多核并且对 GLib 的依赖没有问题,那么这是一个不错的选择。它在同一线程中处理所有事件,从而防止同步问题。此外,它的事件框架还可用于监视和处理基于 IO(即套接字)的事件。

更新:

这是使用 GLib 的实时会话:

>>> import time
>>> import glib
>>> 
>>> def workon(thing):
...     print("%s: working on %s" % (time.time(), thing))
...     return True # use True for repetitive and False for one-time tasks
... 
>>> ml = glib.MainLoop()
>>> 
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>> 
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this

Another option is to use the Phyton GLib bindings, in particular its timeout functions.

It's a good choice as long as you don't want to make use of multiple cores and as long as the dependency on GLib is no problem. It handles all events in the same thread which prevents synchronization issues. Additionally, its event framework can also be used to watch and handle IO-based (i.e. sockets) events.

UPDATE:

Here's a live session using GLib:

>>> import time
>>> import glib
>>> 
>>> def workon(thing):
...     print("%s: working on %s" % (time.time(), thing))
...     return True # use True for repetitive and False for one-time tasks
... 
>>> ml = glib.MainLoop()
>>> 
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>> 
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this
后eg是否自 2024-12-01 16:10:27

在我看来,你可以使用一种叫做“协作多任务处理”的东西。这是基于扭曲的东西,而且真的很酷。只需看看 2010 年的 PyCon 演示即可:

那么你也需要传输队列来做到这一点......

Well in my opinion you could use something called "cooperative multitasking". It's twisted-based thing and its really cool. Just look at PyCon presentation from 2010: http://blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-cooperative-multitasking-with-twisted-getting-things-done-concurrently-11-3352182

Well you will need transport queue to do this too...

妄司 2024-12-01 16:10:27

简单的。您可以从 Thread 继承您的类,并使用像超时这样的参数创建类的实例,因此对于类的每个实例,您可以说超时,这将使您的线程等待该时间

Simple. You can inherit your class from Thread and create instance of your class with Param like timeout so for each instance of your class you can say timeout that will make your thread wait for that time

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文