使用 Tornado 和 Pika 进行异步队列监控

发布于 2024-10-08 12:11:34 字数 2551 浏览 0 评论 0 原文

我有一个 AMQP 服务器 (RabbitMQ),我想在 Tornado 网络服务器。为此,我想我会使用异步 amqp python 库;特别是 Pika (它的一个变体,据说支持 Tornado)。

我编写的代码似乎成功从队列中读取,除了在请求结束时,我得到一个异常(浏览器返回正常):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
    Traceback (most recent call last):
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
        yield
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
        yield
      File "/usr/lib/python2.6/contextlib.py", line 113, in nested
        yield vars
      File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
        callback(*args, **kwargs)
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
        self._handle_read()
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
        self.on_data_available(chunk)
      File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
        self.channels[frame.channel_number].frame_handler(frame)
    KeyError: 1

我不完全确定我是否正确使用了这个库,所以我可能会做一些明显错误的事情。我的代码的基本流程是:

  1. 请求进来
  2. 使用TornadoConnection创建与RabbitMQ的连接;指定一个回调
  3. 在连接回调中,创建一个通道,声明/绑定我的队列,然后调用 basic_consume;指定一个回调
  4. 在consume回调中,关闭通道并调用Tornado的finish函数。
  5. 参见异常。

我的问题有几个:

  1. 这个流程正确吗?我不确定连接回调的目的是什么,只是如果我不使用它,它就不起作用。
  2. 我应该为每个 Web 请求创建一个 AMQP 连接吗? RabbitMQ 的文档表明,不,我不应该,但我应该坚持只创建通道。但是,我将在哪里创建连接,如果连接短暂中断,我该如何尝试重新连接?
  3. 如果我为每个 Web 请求创建一个 AMQP 连接,我应该在哪里关闭它?在我的回调中调用 amqp.close() 似乎会让事情变得更加糟糕。

稍后我将尝试提供一些示例代码,但是我上面描述的步骤相当完整地展示了事物的消费方面。我在发布方面也遇到了问题,但队列的消耗更为紧迫。

I have an AMQP server (RabbitMQ) that I would like to both publish and read from in a Tornado web server. To do this, I figured I would use an asynchronous amqp python library; in particular Pika (a variation of it that supposedly supports Tornado).

I have written code that appears to successfully read from the queue, except that at the end of the request, I get an exception (the browser returns fine):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
    Traceback (most recent call last):
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
        yield
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
        yield
      File "/usr/lib/python2.6/contextlib.py", line 113, in nested
        yield vars
      File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
        callback(*args, **kwargs)
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
        self._handle_read()
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
        self.on_data_available(chunk)
      File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
        self.channels[frame.channel_number].frame_handler(frame)
    KeyError: 1

I'm not entirely sure I am using this library correctly, so I might be doing something blatantly wrong. The basic flow of my code is:

  1. Request comes in
  2. Create connection to RabbitMQ using TornadoConnection; specify a callback
  3. In connection callback, create a channel, declare/bind my queue, and call basic_consume; specify a callback
  4. In consume callback, close the channel and call Tornado's finish function.
  5. See exception.

My questions are a few:

  1. Is this flow even correct? I'm not sure what the purpose of the connection callback is except that it doesn't work if I don't use it.
  2. Should I be creating one AMQP connection per web request? RabbitMQ's documentation suggests that no, I should not but rather I should stick to creating just channels. But where would I create the connection, and how do I attempt reconnects should it go down briefly?
  3. If I am creating one AMQP connection per Web request, where should I be closing it? Calling amqp.close() in my callback seems to screw things up even more.

I will try to have some sample code up a little later, but the steps I described above lay out the consuming side of things fairly completely. I am having issues with the publishing side as well, but the consuming of queues is more pressing.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

静水深流 2024-10-15 12:11:34

查看一些源代码会有所帮助,但我在多个生产项目中使用了同一个支持龙卷风的鼠兔模块,没有出现任何问题。

您不想为每个请求创建一个连接。创建一个包装所有 AMQP 操作的类,并将其实例化为龙卷风应用程序级别的单例,可以跨请求(以及跨请求处理程序)使用。我在“runapp()”函数中执行此操作,该函数执行类似的操作,然后启动主龙卷风 ioloop。

这是一个名为“Events”的类。这是一个部分实现(具体来说,我在这里没有定义“self.handle_event”。这取决于您。

class Event(object):
  def __init__(self, config):
    self.host = 'localhost'
    self.port = '5672'
    self.vhost = '/'
    self.user = 'foo'
    self.exchange = 'myx'
    self.queue = 'myq'
    self.recv_routing_key = 'msgs4me'
    self.passwd = 'bar'

    self.connected = False 
    self.connect()


  def connect(self):

    credentials = pika.PlainCredentials(self.user, self.passwd)

    parameters = pika.ConnectionParameters(host = self.host,
                                         port = self.port,
                                         virtual_host = self.vhost,
                                         credentials = credentials)

    srs = pika.connection.SimpleReconnectionStrategy()

    logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host,
                                                              self.port))
    self.connection = tornado_adapter.TornadoConnection(parameters,
                                                      wait_for_open = False,
                                                      reconnection_strategy = srs,
                                                      callback = self.on_connected)

  def on_connected(self):

    # Open the channel
    logging.debug("Events: Opening a channel")
    self.channel = self.connection.channel()

    # Declare our exchange
    logging.debug("Events: Declaring the %s exchange" %  self.exchange)
    self.channel.exchange_declare(exchange = self.exchange,
                                type = "fanout",
                                auto_delete = False,
                                durable = True)

    # Declare our queue for this process
    logging.debug("Events: Declaring the %s queue" %  self.queue)
    self.channel.queue_declare(queue = self.queue,
                             auto_delete = False,
                             exclusive = False,
                             durable = True)


    # Bind to the exchange
    self.channel.queue_bind(exchange = self.exchange,
                          queue = self.queue,
                          routing_key = self.recv_routing_key)

    self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True)

    # We should be connected if we made it this far
    self.connected = True

然后我将其放入名为“events.py”的文件中。我的 RequestHandler 和任何后端代码都使用“ common.py' 模块包装了对两者都有用的代码(我的 RequestHandler 不直接调用任何 amqp 模块方法——对于数据库、缓存等也是如此),所以我在模块级别定义了 'events=None' common.py,我实例化事件对象有点​​像这样:

import events

def runapp(config):
    if myapp.common.events is None: 
       myapp.common.events = myapp.events.Event(config)
    logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events)
    http_server = tornado.httpserver.HTTPServer(app,
                                            xheaders=config['HTTPServer']['xheaders'],
                                            no_keep_alive=config['HTTPServer']['no_keep_alive'])
    http_server.listen(port) 
    main_loop = tornado.ioloop.IOLoop.instance()
    logging.debug("MAIN IOLOOP: %s", main_loop)
    main_loop.start()

新年快乐:-D

It would help to see some source code, but I use this same tornado-supporting pika module without issue in more than one production project.

You don't want to create a connection per request. Create a class that wraps all of your AMQP operations, and instantiate it as a singleton at the tornado Application level that can be used across requests (and across request handlers). I do this in a 'runapp()' function that does some stuff like this and then starts the main tornado ioloop.

Here's a class called 'Events'. It's a partial implementation (specifically, I don't define 'self.handle_event' here. That's up to you.

class Event(object):
  def __init__(self, config):
    self.host = 'localhost'
    self.port = '5672'
    self.vhost = '/'
    self.user = 'foo'
    self.exchange = 'myx'
    self.queue = 'myq'
    self.recv_routing_key = 'msgs4me'
    self.passwd = 'bar'

    self.connected = False 
    self.connect()


  def connect(self):

    credentials = pika.PlainCredentials(self.user, self.passwd)

    parameters = pika.ConnectionParameters(host = self.host,
                                         port = self.port,
                                         virtual_host = self.vhost,
                                         credentials = credentials)

    srs = pika.connection.SimpleReconnectionStrategy()

    logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host,
                                                              self.port))
    self.connection = tornado_adapter.TornadoConnection(parameters,
                                                      wait_for_open = False,
                                                      reconnection_strategy = srs,
                                                      callback = self.on_connected)

  def on_connected(self):

    # Open the channel
    logging.debug("Events: Opening a channel")
    self.channel = self.connection.channel()

    # Declare our exchange
    logging.debug("Events: Declaring the %s exchange" %  self.exchange)
    self.channel.exchange_declare(exchange = self.exchange,
                                type = "fanout",
                                auto_delete = False,
                                durable = True)

    # Declare our queue for this process
    logging.debug("Events: Declaring the %s queue" %  self.queue)
    self.channel.queue_declare(queue = self.queue,
                             auto_delete = False,
                             exclusive = False,
                             durable = True)


    # Bind to the exchange
    self.channel.queue_bind(exchange = self.exchange,
                          queue = self.queue,
                          routing_key = self.recv_routing_key)

    self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True)

    # We should be connected if we made it this far
    self.connected = True

And then I put that in a file called 'events.py'. My RequestHandlers and any back end code all utilize a 'common.py' module that wraps code that's useful to both (my RequestHandlers don't call any amqp module methods directly -- same for db, cache, etc as well), so I define 'events=None' at the module level in common.py, and I instantiate the Event object kinda like this:

import events

def runapp(config):
    if myapp.common.events is None: 
       myapp.common.events = myapp.events.Event(config)
    logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events)
    http_server = tornado.httpserver.HTTPServer(app,
                                            xheaders=config['HTTPServer']['xheaders'],
                                            no_keep_alive=config['HTTPServer']['no_keep_alive'])
    http_server.listen(port) 
    main_loop = tornado.ioloop.IOLoop.instance()
    logging.debug("MAIN IOLOOP: %s", main_loop)
    main_loop.start()

Happy new year :-D

无语# 2024-10-15 12:11:34

有人在此处报告了成功合并 Tornado 和 Pika 的情况。据我所知,这并不像从 Tornado 中调用 Pika 那么简单,因为两个库都希望拥有自己的事件循环。

Someone has reported success in merging Tornado and Pika here. From what I can tell, it isn't as simple as just calling Pika from Tornado, since both libraries want to have their own event loops in charge.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文