将来自 Twisted `enterprise.adbapi` 的查询添加到由 `twistd` 守护进程创建的反应器循环中
我在 Twisted .tac
插件中使用 twisted.enterprise.adbapi
,并且发现为诸如 aConnectionPool.runQuery(sqlQuery) 等函数返回的延迟对象)
除非调用 reactor.(run)
,否则不会触发。如何将查询添加到由 twistd
创建的反应器循环中,而不是调用 reactor.run()
?它是一个通用过程还是特定于异步数据库 API 的过程?
编辑-附上代码:
from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer
from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
import json
import base64
from twisted.enterprise import adbapi
class StringProducer(object):
implements(IBodyProducer)
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
def httpRequest(url, values, headers={}, method='POST'):
agent = Agent(reactor)
d = agent.request(method,
url,
Headers(headers),
StringProducer(values)
)
def handle_response(response):
if response.code == 204:
d = defer.succeed('')
else:
class SimpleReceiver(protocol.Protocol):
def __init__(s, d):
s.buf = ''; s.d = d
def dataReceived(s, data):
s.buf += data
response = json.loads(data)
receipt = response[u'receipt']
if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
transactionID = receipt[u'original_transaction_id']
date = receipt[u'original_purchase_date']
purchaseDate = date.strip(' Etc/GMT')
print transactionID
print purchaseDate
dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user', passwd='passwd')
dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))
def finishInsert(dObject, pool):
print 'inserted!'
pool.close()
dOperation.addCallback(finishInsert, dbpool)
def insertError(dObject):
print 'insert error!'
dOperation.addErrback(insertError)
def connectionLost(s, reason):
s.d.callback(s.buf)
d = defer.Deferred()
response.deliverBody(SimpleReceiver(d))
return d
d.addCallback(handle_response)
class StoreServer(protocol.Protocol):
def dataReceived(self, data):
a = data.split(':delimiter:')
if a[0] == 'addToUserList':
receiptBase64 = base64.standard_b64encode(a[1])
jsonReceipt = json.dumps({'receipt-data':receiptBase64})
httpRequest(
"https://buy.itunes.apple.com/verifyReceipt",
jsonReceipt,
{'Content-Type': ['application/x-www-form-urlencoded']}
)
application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)
I am using the twisted.enterprise.adbapi
inside a Twisted .tac
plugin, and am finding that the deferred object returned for functions such as aConnectionPool.runQuery(sqlQuery)
are not firing unless reactor.(run)
is called. How can I add the query to the reactor loop created by twistd
instead of calling reactor.run()
? Is it a general procedure or is it something specific to the asynchronous database API?
edit - attached the code:
from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer
from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
import json
import base64
from twisted.enterprise import adbapi
class StringProducer(object):
implements(IBodyProducer)
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
def httpRequest(url, values, headers={}, method='POST'):
agent = Agent(reactor)
d = agent.request(method,
url,
Headers(headers),
StringProducer(values)
)
def handle_response(response):
if response.code == 204:
d = defer.succeed('')
else:
class SimpleReceiver(protocol.Protocol):
def __init__(s, d):
s.buf = ''; s.d = d
def dataReceived(s, data):
s.buf += data
response = json.loads(data)
receipt = response[u'receipt']
if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
transactionID = receipt[u'original_transaction_id']
date = receipt[u'original_purchase_date']
purchaseDate = date.strip(' Etc/GMT')
print transactionID
print purchaseDate
dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user', passwd='passwd')
dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))
def finishInsert(dObject, pool):
print 'inserted!'
pool.close()
dOperation.addCallback(finishInsert, dbpool)
def insertError(dObject):
print 'insert error!'
dOperation.addErrback(insertError)
def connectionLost(s, reason):
s.d.callback(s.buf)
d = defer.Deferred()
response.deliverBody(SimpleReceiver(d))
return d
d.addCallback(handle_response)
class StoreServer(protocol.Protocol):
def dataReceived(self, data):
a = data.split(':delimiter:')
if a[0] == 'addToUserList':
receiptBase64 = base64.standard_b64encode(a[1])
jsonReceipt = json.dumps({'receipt-data':receiptBase64})
httpRequest(
"https://buy.itunes.apple.com/verifyReceipt",
jsonReceipt,
{'Content-Type': ['application/x-www-form-urlencoded']}
)
application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您的代码会为其发出的每个请求创建一个新的
ConnectionPool
。新的ConnectionPool
创建自己的新线程池来运行查询,并且必须建立与数据库的新连接。这意味着您实际上没有连接池。您只需创建并使用一次即可拥有大量连接。此外,错误返回
insertError
不会关闭池。这些因素结合起来意味着一次可以创建的线程/连接数量没有限制,除了系统对可以分配的内存量或可以打开的套接字数量施加的限制之外。当你遇到这些限制之一时,事情就不会那么美好了。
这也意味着每个查询错误都会泄漏一些线程和连接(
ConnectionPool
在启动时设置 3 个线程/连接)。在发生足够多的错误后,您将无法创建更多线程或连接,因此您将无法再查询数据库。您的查询很简单,您可能认为错误不太可能发生,但 MySQL 倾向于随机地断开客户端连接(也许您至少在某种程度上意识到了这一点,因为您确实添加了errback 报告失败)。ConnectionPool
的预期用途是创建一个(或两个,或其他一些小的固定数量),然后将其重新用于所有查询。我不知道这些问题是否与您最初观察到的问题有关,但它们可能是您应该解决的问题。Your code makes a new
ConnectionPool
for every request it issues. The newConnectionPool
creates its own new thread pool to run queries in, and has to set up a new connection to the database.This means you effectively have no connection pool. You just have a lot of connections that you create and use once. Also, the errback,
insertError
, does not close the pool.These things combined mean that there is no limit on the number of threads/connections that might be created at once, except for the limit imposed by your system on how much memory you can allocate, or how many sockets you can open. When you run into one of those limits, things won't be pretty.
It also means that every query error leaks a few threads and connections (
ConnectionPool
sets up 3 threads/connections when it starts). After enough errors, you won't be able to create any more threads or connections, so you won't be able to query your database any more. Your query is simple, and you might think that errors aren't very likely, but MySQL tends to disconnect clients somewhat at random (and maybe you were at least somewhat aware of this, since you did add the errback to report the failure).The intended use of a
ConnectionPool
is to create one (or two, or some other small, fixed number) and then re-use it for all of your queries. Whether these problems are related to the ones that you originally observed or not, I don't know, but they are probably problems you should fix.