Python:deferToThread XMLRPC 服务器 - Twisted - Cherrypy?

发布于 2024-08-21 00:34:49 字数 1514 浏览 4 评论 0原文

这个问题与我在这里问的其他问题有关,主要是关于对内存中的大量数据进行排序。

基本上这就是我想要/拥有的:

Twisted XMLRPC 服务器正在运行。该服务器在内存中保存了几个 (32) Foo 类的实例。每个 Foo 类都包含一个列表栏(其中将包含数百万条记录)。有一个服务可以从数据库检索数据,并将其传递到 XMLRPC 服务器。数据基本上是一个字典,键对应于每个 Foo 实例,值是字典列表,如下所示:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

然后向每个 Foo 实例传递与其键对应的值,并且 Foo.bar 字典被更新和排序。

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

问题是,当在 XMLRPCController 中调用包含大量记录(例如 100K 以上)的更新函数时,它会停止响应我的 getData 调用,直到所有 32 个 Foo 实例都完成 process_data 方法。我认为 deferToThread 会起作用,但我认为我误解了问题所在。

任何建议...我愿意使用其他东西,比如 Cherrypy,如果它支持这种必需的行为。


编辑

@Troy:这就是反应堆的设置方式

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

就GIL而言,改变它是否是一个可行的选择 sys.setcheckinterval() 值变小,因此数据上的锁被释放以便可以读取?

This question is related to others I have asked on here, mainly regarding sorting huge sets of data in memory.

Basically this is what I want / have:

Twisted XMLRPC server running. This server keeps several (32) instances of Foo class in memory. Each Foo class contains a list bar (which will contain several million records). There is a service that retrieves data from a database, and passes it to the XMLRPC server. The data is basically a dictionary, with keys corresponding to each Foo instance, and values are a list of dictionaries, like so:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

Each Foo instance is then passed the value corresponding to it's key, and the Foo.bar dictionaries are updated and sorted.

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

The problem is that when the update function is called in the XMLRPCController with a lot of records (say 100K +) it stops responding to my getData calls until all 32 Foo instances have completed the process_data method. I thought deferToThread would work, but I think I am misunderstanding where the problem is.

Any suggestions... I am open to using something else, like Cherrypy if it supports this required behavior.


EDIT

@Troy: This is how the reactor is set up

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

As far as GIL, would it be a viable option to change
sys.setcheckinterval()
value to something smaller, so the lock on the data is released so it can be read?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

回梦 2024-08-28 00:34:49

让应用程序响应的最简单方法是将 CPU 密集型处理分解为较小的块,同时让扭曲的反应器在其间运行。例如,通过调用reactor.callLater(0, process_next_chunk) 前进到下一个块。有效地实现自己的协作多任务处理。

另一种方法是使用单独的进程来完成工作,然后您将受益于多个核心。看一下Ampoule: https://launchpad.net/ampoule 它提供了类似于deferToThread的API。

The easiest way to get the app to be responsive is to break up the CPU-intensive processing in smaller chunks, while letting the twisted reactor run in between. For example by calling reactor.callLater(0, process_next_chunk) to advance to next chunk. Effectively implementing cooperative multitasking by yourself.

Another way would be to use separate processes to do the work, then you will benefit from multiple cores. Take a look at Ampoule: https://launchpad.net/ampoule It provides an API similar to deferToThread.

手长情犹 2024-08-28 00:34:49

我不知道你的 processData 方法运行了多长时间,也不知道你如何设置你的扭曲反应器。 默认 ,扭曲的反应器有一个 0 到 10 个线程的线程池。您可能尝试将多达 32 个长时间运行的计算推迟到多达 10 个线程。这是次优的。

您还需要询问 GIL 在更新所有这些集合中扮演什么角色。

编辑:
在对程序进行任何重大更改(例如调用 sys.setcheckinterval())之前,您可能应该使用探查器或 python 跟踪模块来运行它。这些应该告诉您您一直在使用哪些方法。如果没有正确的信息,您就无法做出正确的更改。

I don't know how long your processData method runs nor how you're setting up your twisted reactor. By default, the twisted reactor has a thread pool of between 0 and 10 threads. You may be trying to defer as many as 32 long-running calculations to as many as 10 threads. This is sub-optimal.

You also need to ask what role the GIL is playing in updating all these collections.

Edit:
Before you make any serious changes to your program (like calling sys.setcheckinterval()) you should probably run it using the profiler or the python trace module. These should tell you what methods are using all your time. Without the right information, you can't make the right changes.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文