Twisted Python 中的另一个生产者/消费者问题
我正在构建一个使用 Twisted Python 在 Redis 上存储键/值数据的服务器。 服务器通过 HTTP 接收 JSON 字典,将其转换为 Python 字典并放入缓冲区。每次存储新数据时,服务器都会调度一项任务,使用 txredis 客户端从缓冲区中弹出一个字典并将每个元组写入 Redis 实例。
class Datastore(Resource):
isLeaf = True
def __init__(self):
self.clientCreator = protocol.ClientCreator(reactor, Redis)
d = self.clientCreator.connectTCP(...)
d.addCallback(self.setRedis)
self.redis = None
self.buffer = deque()
def render_POST(self, request):
try:
task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
except IndexError:
request.setResponseCode(503)
return '<html><body>Error reading task_id</body></html>'
data = json.loads(request.content.read())
self.buffer.append((task_id, data))
reactor.callLater(0, self.write_on_redis)
return ' '
@defer.inlineCallbacks
def write_on_redis(self):
try:
task_id, dic = self.buffer.pop()
log.msg('Buffer: %s' % len(self.buffer))
except IndexError:
log.msg('buffer empty')
defer.returnValue(1)
m = yield self.redis.sismember('DONE', task_id)
# Simple check
if m == '1':
log.msg('%s already stored' % task_id)
else:
log.msg('%s unpacking' % task_id)
s = yield self.redis.sadd('DONE', task_id)
d = defer.Deferred()
for k, v in dic.iteritems():
k = k.encode()
d.addCallback(self.redis.push, k, v)
d.callback(None)
基本上,我面临两个不同连接之间的生产者/消费者问题,但我不确定当前的实现在 Twisted paradygm 中是否运行良好。 我已经阅读了 Twisted 中有关生产者/消费者接口的小文档,但我不确定是否可以在我的情况下使用它们。 欢迎任何批评:在经历了多年的线程并发之后,我正在尝试掌握事件驱动的编程。
I am building a server which stores key/value data on top of Redis using Twisted Python.
The server receives a JSON dictionary via HTTP, which is converted into a Python dictionary and put in a buffer. Everytime new data is stored, the server schedules a task which pops one dictionary from the buffer and writes every tuple into a Redis instance, using a txredis client.
class Datastore(Resource):
isLeaf = True
def __init__(self):
self.clientCreator = protocol.ClientCreator(reactor, Redis)
d = self.clientCreator.connectTCP(...)
d.addCallback(self.setRedis)
self.redis = None
self.buffer = deque()
def render_POST(self, request):
try:
task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
except IndexError:
request.setResponseCode(503)
return '<html><body>Error reading task_id</body></html>'
data = json.loads(request.content.read())
self.buffer.append((task_id, data))
reactor.callLater(0, self.write_on_redis)
return ' '
@defer.inlineCallbacks
def write_on_redis(self):
try:
task_id, dic = self.buffer.pop()
log.msg('Buffer: %s' % len(self.buffer))
except IndexError:
log.msg('buffer empty')
defer.returnValue(1)
m = yield self.redis.sismember('DONE', task_id)
# Simple check
if m == '1':
log.msg('%s already stored' % task_id)
else:
log.msg('%s unpacking' % task_id)
s = yield self.redis.sadd('DONE', task_id)
d = defer.Deferred()
for k, v in dic.iteritems():
k = k.encode()
d.addCallback(self.redis.push, k, v)
d.callback(None)
Basically, I am facing a Producer/Consumer problem between two different connections, but I am not sure that the current implementation works well in the Twisted paradygm.
I have read the small documentation about producer/consumer interfaces in Twisted, but I am not sure if I can use them in my case.
Any critics is welcome: I am trying to get a grasp of event-driven programming, after too many years of thread concurrency.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
Twisted 中的生产者和消费者 API、
IProducer
和IConsumer
与流量控制有关。您在这里似乎没有任何流量控制,您只是将消息从一种协议中继到另一种协议。由于没有流量控制,缓冲区变得更加复杂。您可以通过将数据直接传递给 write_on_redis 方法来摆脱它。这样
write_on_redis
不需要处理空缓冲区的情况,您不需要资源上的额外属性,甚至可以摆脱callLater
(尽管即使保留缓冲区也可以这样做)。不过,我不知道这些是否能回答您的问题。至于这种方法是否“运作良好”,以下是我通过阅读代码注意到的事情:
sismember
调用或sadd
调用没有错误处理,如果其中任何一个失败,您可能会丢失任务,因为您已经从工作缓冲区中弹出了它们。Deferred
d
上执行推送作为回调还意味着任何失败的推送都将阻止其余数据的推送。它还将push
返回的Deferred
结果(我假设它返回一个Deferred
)作为下一个调用的第一个参数传递,因此除非push
或多或少忽略了它的第一个参数,否则您不会将正确的数据推送到 redis。如果您想实现流量控制,那么您需要让 HTTP 服务器检查
self.buffer
的长度,并可能拒绝新任务 - 不将其添加到self.buffer
并向客户端返回一些错误代码。您仍然不会使用IConsumer
和IProducer
,但它有点相似。The producer and consumer APIs in Twisted,
IProducer
andIConsumer
, are about flow control. You don't seem to have any flow control here, you're just relaying messages from one protocol to another.Since there's no flow control, the buffer is just extra complexity. You could get rid of it by just passing the data directly to the
write_on_redis
method. This waywrite_on_redis
doesn't need to handle the empty buffer case, you don't need the extra attribute on the resource, and you can even get rid of thecallLater
(although you can also do this even if you keep the buffer).I don't know if any of this answers your question, though. As far as whether this approach "works well", here are the things I notice just by reading the code:
sismember
call or thesadd
call, you may lose tasks if either of these fail, since you've already popped them from the work buffer.Deferred
d
also means that any failed push will prevent the rest of the data from being pushed. It also passes the result of theDeferred
returned bypush
(I'm assuming it returns aDeferred
) as the first argument to the next call, so unlesspush
more or less ignores its first argument, you won't be pushing the right data to redis.If you want to implement flow control, then you need to have your HTTP server check the length of
self.buffer
and possibly reject the new task - not adding it toself.buffer
and returning some error code to the client. You still won't be usingIConsumer
andIProducer
, but it's sort of similar.