在 Tornado 中处理 Redis 连接的正确方法是什么? (异步 - 发布/订阅)

发布于 2024-12-20 22:20:55 字数 2649 浏览 2 评论 0原文

我将 Redis 与我的 Tornado 应用程序和异步客户端 Brukva 一起使用,当我查看 Brukva 网站上的示例应用程序时,它们正在 websocket 中的“init”方法上建立新连接,

class MessagesCatcher(tornado.websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(MessagesCatcher, self).__init__(*args, **kwargs)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe('test_channel')

    def open(self):
        self.client.listen(self.on_message)

    def on_message(self, result):
        self.write_message(str(result.body))

    def close(self):
        self.client.unsubscribe('test_channel')
        self.client.disconnect()

在 websocket 的情况下效果很好但是如何在常见的Tornado RequestHandler post方法中处理它说长轮询操作(发布-订阅模型)。我正在更新处理程序的每个帖子方法中建立新的客户端连接,这是正确的方法吗?当我检查 redis 控制台时,我发现每次新的后期操作中客户端都会增加。

在此处输入图像描述

这是我的代码示例。

c = brukva.Client(host = '127.0.0.1')
c.connect()

class MessageNewHandler(BaseHandler):
    @tornado.web.authenticated
    def post(self):

        self.listing_id = self.get_argument("listing_id")
        message = {
            "id": str(uuid.uuid4()),
            "from": str(self.get_secure_cookie("username")),
            "body": str(self.get_argument("body")),
        }
        message["html"] = self.render_string("message.html", message=message)

        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            c.publish(self.listing_id, message)
            logging.info("Writing message : " + json.dumps(message))
            self.write(json.dumps(message))

    class MessageUpdatesHandler(BaseHandler):
        @tornado.web.authenticated
        @tornado.web.asynchronous
        def post(self):
            self.listing_id = self.get_argument("listing_id", None)
            self.client = brukva.Client()
            self.client.connect()
            self.client.subscribe(self.listing_id)
            self.client.listen(self.on_new_messages)

        def on_new_messages(self, messages):
            # Closed client connection
            if self.request.connection.stream.closed():
                return
            logging.info("Getting update : " + json.dumps(messages.body))
            self.finish(json.dumps(messages.body))
            self.client.unsubscribe(self.listing_id)


        def on_connection_close(self):
            # unsubscribe user from channel
            self.client.unsubscribe(self.listing_id)
            self.client.disconnect()

如果您提供一些类似案例的示例代码,我将不胜感激。

I am using Redis along with my Tornado application with asyc client Brukva, when I looked at the sample apps at Brukva site they are making new connection on "init" method in websocket

class MessagesCatcher(tornado.websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(MessagesCatcher, self).__init__(*args, **kwargs)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe('test_channel')

    def open(self):
        self.client.listen(self.on_message)

    def on_message(self, result):
        self.write_message(str(result.body))

    def close(self):
        self.client.unsubscribe('test_channel')
        self.client.disconnect()

its fine in the case of websocket but how to handle it in the common Tornado RequestHandler post method say long polling operation (publish-subscribe model). I am making new client connetion in every post method of update handler is this the right approach ?? When I checked at the redis console I see that clients increasing in every new post operation.

enter image description here

Here is a sample of my code.

c = brukva.Client(host = '127.0.0.1')
c.connect()

class MessageNewHandler(BaseHandler):
    @tornado.web.authenticated
    def post(self):

        self.listing_id = self.get_argument("listing_id")
        message = {
            "id": str(uuid.uuid4()),
            "from": str(self.get_secure_cookie("username")),
            "body": str(self.get_argument("body")),
        }
        message["html"] = self.render_string("message.html", message=message)

        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            c.publish(self.listing_id, message)
            logging.info("Writing message : " + json.dumps(message))
            self.write(json.dumps(message))

    class MessageUpdatesHandler(BaseHandler):
        @tornado.web.authenticated
        @tornado.web.asynchronous
        def post(self):
            self.listing_id = self.get_argument("listing_id", None)
            self.client = brukva.Client()
            self.client.connect()
            self.client.subscribe(self.listing_id)
            self.client.listen(self.on_new_messages)

        def on_new_messages(self, messages):
            # Closed client connection
            if self.request.connection.stream.closed():
                return
            logging.info("Getting update : " + json.dumps(messages.body))
            self.finish(json.dumps(messages.body))
            self.client.unsubscribe(self.listing_id)


        def on_connection_close(self):
            # unsubscribe user from channel
            self.client.unsubscribe(self.listing_id)
            self.client.disconnect()

I appreciate if you provide some sample code for similar case.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

私野 2024-12-27 22:20:55

有点晚了,但是,我一直在使用 tornado-redis。它与tornado的ioloop和tornado.gen模块一起使用

安装tornadoredis

它可以通过pip

pip install tornadoredis

或setuptools

easy_install tornadoredis

安装,但你真的不应该这样做。您还可以克隆存储库并提取它。然后运行

python setup.py build
python setup.py install

连接到redis

以下代码放入您的main.py或等效的

redis_conn = tornadoredis.Client('hostname', 'port')
redis_conn.connect()

redis.connect中仅被调用一次。这是一个阻塞调用,因此应该在启动主 ioloop 之前调用它。所有处理程序之间共享相同的连接对象。

您可以将其添加到您的应用程序设置中,例如

settings = {
    redis = redis_conn
}
app = tornado.web.Application([('/.*', Handler),],
                              **settings)

使用tornadoredis

连接可以在处理程序中用作self.settings['redis'],也可以作为属性添加BaseHandler 类。您的请求处理程序是该类的子类并访问该属性。

class BaseHandler(tornado.web.RequestHandler):

    @property
    def redis():
        return self.settings['redis']

为了与 Redis 通信,使用了 tornado.web.asynchronoustornado.gen.engine 装饰器

class SomeHandler(BaseHandler):

    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        foo = yield gen.Task(self.redis.get, 'foo')
        self.render('sometemplate.html', {'foo': foo}

额外信息

更多示例和其他功能,例如连接池和管道可以在 github 存储库中找到。

A little late but, I've been using tornado-redis. It works with tornado's ioloop and the tornado.gen module

Install tornadoredis

It can be installed from pip

pip install tornadoredis

or with setuptools

easy_install tornadoredis

but you really shouldn't do that. You could also clone the repository and extract it. Then run

python setup.py build
python setup.py install

Connect to redis

The following code goes in your main.py or equivalent

redis_conn = tornadoredis.Client('hostname', 'port')
redis_conn.connect()

redis.connect is called only once. It is a blocking call, so it should be called before starting the main ioloop. The same connection object is shared between all the handlers.

You could add it to your application settings like

settings = {
    redis = redis_conn
}
app = tornado.web.Application([('/.*', Handler),],
                              **settings)

Use tornadoredis

The connection can be used in handlers as self.settings['redis'] or it can be added as a property of the BaseHandler class. Your request handlers subclass that class and access the property.

class BaseHandler(tornado.web.RequestHandler):

    @property
    def redis():
        return self.settings['redis']

To communicate with redis, the tornado.web.asynchronous and the tornado.gen.engine decorators are used

class SomeHandler(BaseHandler):

    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        foo = yield gen.Task(self.redis.get, 'foo')
        self.render('sometemplate.html', {'foo': foo}

Extra information

More examples and other features like connection pooling and pipelines can be found at the github repo.

在风中等你 2024-12-27 22:20:55

您应该在应用程序中汇集连接。因为 brukva 似乎不自动支持这一点(redis-py 支持这一点,但本质上是阻塞的,所以它与龙卷风不兼容),所以您需要编写自己的连接池。

不过,模式非常简单。沿着这些思路(这不是真正的操作代码):

class BrukvaPool():

    __conns = {}


    def get(host, port,db):
        ''' Get a client for host, port, db '''

        key = "%s:%s:%s" % (host, port, db)

        conns = self.__conns.get(key, [])
        if conns:
            ret = conns.pop()
            return ret
        else:
           ## Init brukva client here and connect it

    def release(client):
        ''' release a client at the end of a request '''
        key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db)
        self.__conns.setdefault(key, []).append(client)

它可能有点棘手,但这就是主要思想。

you should pool the connections in your app. since it seems like brukva doesn't support this automatically (redis-py supports this, but is blocking by nature so it doesn't go well with tornado), you need to write your own connection pool.

the pattern is pretty simple, though. something along these lines (this is not real operational code):

class BrukvaPool():

    __conns = {}


    def get(host, port,db):
        ''' Get a client for host, port, db '''

        key = "%s:%s:%s" % (host, port, db)

        conns = self.__conns.get(key, [])
        if conns:
            ret = conns.pop()
            return ret
        else:
           ## Init brukva client here and connect it

    def release(client):
        ''' release a client at the end of a request '''
        key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db)
        self.__conns.setdefault(key, []).append(client)

it can be a bit more tricky, but that's the main idea.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文