我有一个 AMQP 服务器 (RabbitMQ),我想在 Tornado 网络服务器。为此,我想我会使用异步 amqp python 库;特别是 Pika (它的一个变体,据说支持 Tornado)。
我编写的代码似乎成功从队列中读取,除了在请求结束时,我得到一个异常(浏览器返回正常):
[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
Traceback (most recent call last):
File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
yield
File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/lib/python2.6/contextlib.py", line 113, in nested
yield vars
File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
callback(*args, **kwargs)
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
self._handle_read()
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
self.on_data_available(chunk)
File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
self.channels[frame.channel_number].frame_handler(frame)
KeyError: 1
我不完全确定我是否正确使用了这个库,所以我可能会做一些明显错误的事情。我的代码的基本流程是:
- 请求进来
- 使用TornadoConnection创建与RabbitMQ的连接;指定一个回调
- 在连接回调中,创建一个通道,声明/绑定我的队列,然后调用 basic_consume;指定一个回调
- 在consume回调中,关闭通道并调用Tornado的finish函数。
- 参见异常。
我的问题有几个:
- 这个流程正确吗?我不确定连接回调的目的是什么,只是如果我不使用它,它就不起作用。
- 我应该为每个 Web 请求创建一个 AMQP 连接吗? RabbitMQ 的文档表明,不,我不应该,但我应该坚持只创建通道。但是,我将在哪里创建连接,如果连接短暂中断,我该如何尝试重新连接?
- 如果我为每个 Web 请求创建一个 AMQP 连接,我应该在哪里关闭它?在我的回调中调用 amqp.close() 似乎会让事情变得更加糟糕。
稍后我将尝试提供一些示例代码,但是我上面描述的步骤相当完整地展示了事物的消费方面。我在发布方面也遇到了问题,但队列的消耗更为紧迫。
I have an AMQP server (RabbitMQ) that I would like to both publish and read from in a Tornado web server. To do this, I figured I would use an asynchronous amqp python library; in particular Pika (a variation of it that supposedly supports Tornado).
I have written code that appears to successfully read from the queue, except that at the end of the request, I get an exception (the browser returns fine):
[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
Traceback (most recent call last):
File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
yield
File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/lib/python2.6/contextlib.py", line 113, in nested
yield vars
File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
callback(*args, **kwargs)
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
self._handle_read()
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
self.on_data_available(chunk)
File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
self.channels[frame.channel_number].frame_handler(frame)
KeyError: 1
I'm not entirely sure I am using this library correctly, so I might be doing something blatantly wrong. The basic flow of my code is:
- Request comes in
- Create connection to RabbitMQ using TornadoConnection; specify a callback
- In connection callback, create a channel, declare/bind my queue, and call basic_consume; specify a callback
- In consume callback, close the channel and call Tornado's finish function.
- See exception.
My questions are a few:
- Is this flow even correct? I'm not sure what the purpose of the connection callback is except that it doesn't work if I don't use it.
- Should I be creating one AMQP connection per web request? RabbitMQ's documentation suggests that no, I should not but rather I should stick to creating just channels. But where would I create the connection, and how do I attempt reconnects should it go down briefly?
- If I am creating one AMQP connection per Web request, where should I be closing it? Calling amqp.close() in my callback seems to screw things up even more.
I will try to have some sample code up a little later, but the steps I described above lay out the consuming side of things fairly completely. I am having issues with the publishing side as well, but the consuming of queues is more pressing.
发布评论
评论(2)
查看一些源代码会有所帮助,但我在多个生产项目中使用了同一个支持龙卷风的鼠兔模块,没有出现任何问题。
您不想为每个请求创建一个连接。创建一个包装所有 AMQP 操作的类,并将其实例化为龙卷风应用程序级别的单例,可以跨请求(以及跨请求处理程序)使用。我在“runapp()”函数中执行此操作,该函数执行类似的操作,然后启动主龙卷风 ioloop。
这是一个名为“Events”的类。这是一个部分实现(具体来说,我在这里没有定义“self.handle_event”。这取决于您。
然后我将其放入名为“events.py”的文件中。我的 RequestHandler 和任何后端代码都使用“ common.py' 模块包装了对两者都有用的代码(我的 RequestHandler 不直接调用任何 amqp 模块方法——对于数据库、缓存等也是如此),所以我在模块级别定义了 'events=None' common.py,我实例化事件对象有点像这样:
新年快乐:-D
It would help to see some source code, but I use this same tornado-supporting pika module without issue in more than one production project.
You don't want to create a connection per request. Create a class that wraps all of your AMQP operations, and instantiate it as a singleton at the tornado Application level that can be used across requests (and across request handlers). I do this in a 'runapp()' function that does some stuff like this and then starts the main tornado ioloop.
Here's a class called 'Events'. It's a partial implementation (specifically, I don't define 'self.handle_event' here. That's up to you.
And then I put that in a file called 'events.py'. My RequestHandlers and any back end code all utilize a 'common.py' module that wraps code that's useful to both (my RequestHandlers don't call any amqp module methods directly -- same for db, cache, etc as well), so I define 'events=None' at the module level in common.py, and I instantiate the Event object kinda like this:
Happy new year :-D
有人在此处报告了成功合并 Tornado 和 Pika 的情况。据我所知,这并不像从 Tornado 中调用 Pika 那么简单,因为两个库都希望拥有自己的事件循环。
Someone has reported success in merging Tornado and Pika here. From what I can tell, it isn't as simple as just calling Pika from Tornado, since both libraries want to have their own event loops in charge.