将来自 Twisted `enterprise.adbapi` 的查询添加到由 `twistd` 守护进程创建的反应器循环中

发布于 2024-12-11 17:13:27 字数 3735 浏览 0 评论 0原文

我在 Twisted .tac 插件中使用 twisted.enterprise.adbapi,并且发现为诸如 aConnectionPool.runQuery(sqlQuery) 等函数返回的延迟对象) 除非调用 reactor.(run) ,否则不会触发。如何将查询添加到由 twistd 创建的反应器循环中,而不是调用 reactor.run()?它是一个通用过程还是特定于异步数据库 API 的过程?

编辑-附上代码:

from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer

from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

import json
import base64
from twisted.enterprise import adbapi

class StringProducer(object):
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer):
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self):
        pass

    def stopProducing(self):
        pass

def httpRequest(url, values, headers={}, method='POST'):

    agent = Agent(reactor)
    d = agent.request(method,
                      url,
                      Headers(headers),
                      StringProducer(values)
                      )

    def handle_response(response):
        if response.code == 204:
            d = defer.succeed('')
        else:
            class SimpleReceiver(protocol.Protocol):
                def __init__(s, d):
                    s.buf = ''; s.d = d
                def dataReceived(s, data):
                    s.buf += data
                    response = json.loads(data)

                    receipt = response[u'receipt']
                    if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
                        transactionID = receipt[u'original_transaction_id']
                        date = receipt[u'original_purchase_date']
                        purchaseDate = date.strip(' Etc/GMT')
                        print transactionID
                        print purchaseDate

                        dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user',  passwd='passwd')
                        dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))

                        def finishInsert(dObject, pool):
                            print 'inserted!'
                            pool.close()
                        dOperation.addCallback(finishInsert, dbpool)

                        def insertError(dObject):
                            print 'insert error!'
                        dOperation.addErrback(insertError)

                def connectionLost(s, reason):
                    s.d.callback(s.buf)

            d = defer.Deferred()
            response.deliverBody(SimpleReceiver(d))
        return d

    d.addCallback(handle_response)

class StoreServer(protocol.Protocol):

    def dataReceived(self, data):
        a = data.split(':delimiter:')

        if a[0] == 'addToUserList':
            receiptBase64 = base64.standard_b64encode(a[1])
            jsonReceipt = json.dumps({'receipt-data':receiptBase64})

            httpRequest(
                        "https://buy.itunes.apple.com/verifyReceipt",
                        jsonReceipt,
                        {'Content-Type': ['application/x-www-form-urlencoded']}
                        )

application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)

I am using the twisted.enterprise.adbapi inside a Twisted .tac plugin, and am finding that the deferred object returned for functions such as aConnectionPool.runQuery(sqlQuery) are not firing unless reactor.(run) is called. How can I add the query to the reactor loop created by twistd instead of calling reactor.run()? Is it a general procedure or is it something specific to the asynchronous database API?

edit - attached the code:

from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer

from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

import json
import base64
from twisted.enterprise import adbapi

class StringProducer(object):
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer):
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self):
        pass

    def stopProducing(self):
        pass

def httpRequest(url, values, headers={}, method='POST'):

    agent = Agent(reactor)
    d = agent.request(method,
                      url,
                      Headers(headers),
                      StringProducer(values)
                      )

    def handle_response(response):
        if response.code == 204:
            d = defer.succeed('')
        else:
            class SimpleReceiver(protocol.Protocol):
                def __init__(s, d):
                    s.buf = ''; s.d = d
                def dataReceived(s, data):
                    s.buf += data
                    response = json.loads(data)

                    receipt = response[u'receipt']
                    if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
                        transactionID = receipt[u'original_transaction_id']
                        date = receipt[u'original_purchase_date']
                        purchaseDate = date.strip(' Etc/GMT')
                        print transactionID
                        print purchaseDate

                        dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user',  passwd='passwd')
                        dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))

                        def finishInsert(dObject, pool):
                            print 'inserted!'
                            pool.close()
                        dOperation.addCallback(finishInsert, dbpool)

                        def insertError(dObject):
                            print 'insert error!'
                        dOperation.addErrback(insertError)

                def connectionLost(s, reason):
                    s.d.callback(s.buf)

            d = defer.Deferred()
            response.deliverBody(SimpleReceiver(d))
        return d

    d.addCallback(handle_response)

class StoreServer(protocol.Protocol):

    def dataReceived(self, data):
        a = data.split(':delimiter:')

        if a[0] == 'addToUserList':
            receiptBase64 = base64.standard_b64encode(a[1])
            jsonReceipt = json.dumps({'receipt-data':receiptBase64})

            httpRequest(
                        "https://buy.itunes.apple.com/verifyReceipt",
                        jsonReceipt,
                        {'Content-Type': ['application/x-www-form-urlencoded']}
                        )

application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

燕归巢 2024-12-18 17:13:27

您的代码会为其发出的每个请求创建一个新的 ConnectionPool。新的 ConnectionPool 创建自己的新线程池来运行查询,并且必须建立与数据库的新连接。

这意味着您实际上没有连接池。您只需创建并使用一次即可拥有大量连接。此外,错误返回 insertError 不会关闭池。

这些因素结合起来意味着一次可以创建的线程/连接数量没有限制,除了系统对可以分配的内存量或可以打开的套接字数量施加的限制之外。当你遇到这些限制之一时,事情就不会那么美好了。

这也意味着每个查询错误都会泄漏一些线程和连接(ConnectionPool 在启动时设置 3 个线程/连接)。在发生足够多的错误后,您将无法创建更多线程或连接,因此您将无法再查询数据库。您的查询很简单,您可能认为错误不太可能发生,但 MySQL 倾向于随机地断开客户端连接(也许您至少在某种程度上意识到了这一点,因为您确实添加了errback 报告失败)。

ConnectionPool 的预期用途是创建一个(或两个,或其他一些小的固定数量),然后将其重新用于所有查询。我不知道这些问题是否与您最初观察到的问题有关,但它们可能是您应该解决的问题。

Your code makes a new ConnectionPool for every request it issues. The new ConnectionPool creates its own new thread pool to run queries in, and has to set up a new connection to the database.

This means you effectively have no connection pool. You just have a lot of connections that you create and use once. Also, the errback, insertError, does not close the pool.

These things combined mean that there is no limit on the number of threads/connections that might be created at once, except for the limit imposed by your system on how much memory you can allocate, or how many sockets you can open. When you run into one of those limits, things won't be pretty.

It also means that every query error leaks a few threads and connections (ConnectionPool sets up 3 threads/connections when it starts). After enough errors, you won't be able to create any more threads or connections, so you won't be able to query your database any more. Your query is simple, and you might think that errors aren't very likely, but MySQL tends to disconnect clients somewhat at random (and maybe you were at least somewhat aware of this, since you did add the errback to report the failure).

The intended use of a ConnectionPool is to create one (or two, or some other small, fixed number) and then re-use it for all of your queries. Whether these problems are related to the ones that you originally observed or not, I don't know, but they are probably problems you should fix.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文