Twisted Python 中的另一个生产者/消费者问题

发布于 2024-10-19 15:58:08 字数 1656 浏览 3 评论 0原文

我正在构建一个使用 Twisted Python 在 Redis 上存储键/值数据的服务器。 服务器通过 HTTP 接收 JSON 字典,将其转换为 Python 字典并放入缓冲区。每次存储新数据时,服务器都会调度一项任务,使用 txredis 客户端从缓冲区中弹出一个字典并将每个元组写入 Redis 实例。

class Datastore(Resource):

isLeaf = True

def __init__(self):
    self.clientCreator = protocol.ClientCreator(reactor, Redis)
    d = self.clientCreator.connectTCP(...)
    d.addCallback(self.setRedis)
    self.redis = None
    self.buffer = deque()


def render_POST(self, request):
    try:
        task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
    except IndexError:
        request.setResponseCode(503)
        return '<html><body>Error reading task_id</body></html>'  

    data = json.loads(request.content.read())
    self.buffer.append((task_id, data))
    reactor.callLater(0, self.write_on_redis)
    return ' '

@defer.inlineCallbacks 
def write_on_redis(self):
    try:
        task_id, dic = self.buffer.pop()
        log.msg('Buffer: %s' % len(self.buffer))
    except IndexError:
        log.msg('buffer empty')
        defer.returnValue(1)

    m = yield self.redis.sismember('DONE', task_id)
    # Simple check
    if m == '1':
        log.msg('%s already stored' % task_id)
    else:
        log.msg('%s unpacking' % task_id)
        s = yield self.redis.sadd('DONE', task_id)

        d = defer.Deferred()
        for k, v in dic.iteritems():
            k = k.encode()
            d.addCallback(self.redis.push, k, v)

        d.callback(None)

基本上,我面临两个不同连接之间的生产者/消费者问题,但我不确定当前的实现在 Twisted paradygm 中是否运行良好。 我已经阅读了 Twisted 中有关生产者/消费者接口的小文档,但我不确定是否可以在我的情况下使用它们。 欢迎任何批评:在经历了多年的线程并发之后,我正在尝试掌握事件驱动的编程。

I am building a server which stores key/value data on top of Redis using Twisted Python.
The server receives a JSON dictionary via HTTP, which is converted into a Python dictionary and put in a buffer. Everytime new data is stored, the server schedules a task which pops one dictionary from the buffer and writes every tuple into a Redis instance, using a txredis client.

class Datastore(Resource):

isLeaf = True

def __init__(self):
    self.clientCreator = protocol.ClientCreator(reactor, Redis)
    d = self.clientCreator.connectTCP(...)
    d.addCallback(self.setRedis)
    self.redis = None
    self.buffer = deque()


def render_POST(self, request):
    try:
        task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
    except IndexError:
        request.setResponseCode(503)
        return '<html><body>Error reading task_id</body></html>'  

    data = json.loads(request.content.read())
    self.buffer.append((task_id, data))
    reactor.callLater(0, self.write_on_redis)
    return ' '

@defer.inlineCallbacks 
def write_on_redis(self):
    try:
        task_id, dic = self.buffer.pop()
        log.msg('Buffer: %s' % len(self.buffer))
    except IndexError:
        log.msg('buffer empty')
        defer.returnValue(1)

    m = yield self.redis.sismember('DONE', task_id)
    # Simple check
    if m == '1':
        log.msg('%s already stored' % task_id)
    else:
        log.msg('%s unpacking' % task_id)
        s = yield self.redis.sadd('DONE', task_id)

        d = defer.Deferred()
        for k, v in dic.iteritems():
            k = k.encode()
            d.addCallback(self.redis.push, k, v)

        d.callback(None)

Basically, I am facing a Producer/Consumer problem between two different connections, but I am not sure that the current implementation works well in the Twisted paradygm.
I have read the small documentation about producer/consumer interfaces in Twisted, but I am not sure if I can use them in my case.
Any critics is welcome: I am trying to get a grasp of event-driven programming, after too many years of thread concurrency.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

寻找我们的幸福 2024-10-26 15:58:08

Twisted 中的生产者和消费者 API、IProducerIConsumer 与流量控制有关。您在这里似乎没有任何流量控制,您只是将消息从一种协议中继到另一种协议。

由于没有流量控制,缓冲区变得更加复杂。您可以通过将数据直接传递给 write_on_redis 方法来摆脱它。这样 write_on_redis 不需要处理空缓冲区的情况,您不需要资源上的额外属性,甚至可以摆脱 callLater (尽管即使保留缓冲区也可以这样做)。

不过,我不知道这些是否能回答您的问题。至于这种方法是否“运作良好”,以下是我通过阅读代码注意到的事情:

  • 如果数据到达的速度比 Redis 接受它的速度快,您的未完成作业列表可能会变得任意大,导致您耗尽内存。这就是流量控制所提供的帮助。
  • 如果 sismember 调用或 sadd 调用没有错误处理,如果其中任何一个失败,您可能会丢失任务,因为您已经从工作缓冲区中弹出了它们。
  • Deferred d 上执行推送作为回调还意味着任何失败的推送都将阻止其余数据的推送。它还将 push 返回的 Deferred 结果(我假设它返回一个 Deferred)作为下一个调用的第一个参数传递,因此除非 push 或多或少忽略了它的第一个参数,否则您不会将正确的数据推送到 redis。

如果您想实现流量控制,那么您需要让 HTTP 服务器检查 self.buffer 的长度,并可能拒绝新任务 - 将其添加到 self.buffer 并向客户端返回一些错误代码。您仍然不会使用 IConsumerIProducer,但它有点相似。

The producer and consumer APIs in Twisted, IProducer and IConsumer, are about flow control. You don't seem to have any flow control here, you're just relaying messages from one protocol to another.

Since there's no flow control, the buffer is just extra complexity. You could get rid of it by just passing the data directly to the write_on_redis method. This way write_on_redis doesn't need to handle the empty buffer case, you don't need the extra attribute on the resource, and you can even get rid of the callLater (although you can also do this even if you keep the buffer).

I don't know if any of this answers your question, though. As far as whether this approach "works well", here are the things I notice just by reading the code:

  • If data arrives faster than redis accepts it, your list of outstanding jobs may become arbitrarily large, causing you to run out of memory. This is what flow control would help with.
  • With no error handling around the sismember call or the sadd call, you may lose tasks if either of these fail, since you've already popped them from the work buffer.
  • Doing a push as a callback on that Deferred d also means that any failed push will prevent the rest of the data from being pushed. It also passes the result of the Deferred returned by push (I'm assuming it returns a Deferred) as the first argument to the next call, so unless push more or less ignores its first argument, you won't be pushing the right data to redis.

If you want to implement flow control, then you need to have your HTTP server check the length of self.buffer and possibly reject the new task - not adding it to self.buffer and returning some error code to the client. You still won't be using IConsumer and IProducer, but it's sort of similar.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文