非阻塞Redis pubsub 可能吗?
我想使用redis的pubsub来传输一些消息,但不想使用listen
被阻止,就像下面的代码:
import redis
rc = redis.Redis()
ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])
rc.publish('foo', 'hello world')
for item in ps.listen():
if item['type'] == 'message':
print item['channel']
print item['data']
最后一个for
部分将被阻止。我只想检查给定通道是否有数据,我该如何实现这一点?有类似check
的方法吗?
I want to use redis' pubsub to transmit some messages, but don't want be blocked using listen
, like the code below:
import redis
rc = redis.Redis()
ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])
rc.publish('foo', 'hello world')
for item in ps.listen():
if item['type'] == 'message':
print item['channel']
print item['data']
The last for
section will block. I just want to check if a given channel has data, how can I accomplish this? Is there a check
like method?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(11)
如果您正在考虑非阻塞、异步处理,您可能正在使用(或应该使用)异步框架/服务器。
如果您使用Tornado,则有Tornado-Redis。它使用本机 Tornado 生成器调用。其 Websocket 演示 提供了如何将其与pub/sub。
如果您使用Twisted,则有txRedis。那里还有 pub/sub 示例。
似乎您还可以将 Redis-py 与 Gevent 结合使用,使用 Gevent 的猴子补丁(
gevent.monkey.patch_all()
)。更新:
距离最初的答案已经过去 5 年了,与此同时 Python 获得了原生异步 IO 支持。现在有 AIORedis,一个异步 IO Redis 客户端。
If you're thinking of non-blocking, asynchronous processing, you're probably using (or should use) asynchronous framework/server.
if you're using Tornado, there is Tornado-Redis. It's using native Tornado generator calls. Its Websocket demo provides example on how to use it in combination with pub/sub.
if you're using Twisted, there is txRedis. There you also have pub/sub example.
it also seems that you can use Redis-py combined with Gevent with no problems using Gevent's monkey patching (
gevent.monkey.patch_all()
).UPDATE:
It's been 5 years since the original answer, in the mean time Python got native async IO support. There now is AIORedis, an async IO Redis client.
接受的答案已过时,因为 redis-py 建议您使用非阻塞
get_message()
。但它也提供了一种轻松使用线程的方法。https://pypi.python.org/pypi/redis
读取消息有三种不同的策略。
在幕后,get_message() 使用系统的“select”模块快速轮询连接的套接字。如果有可供读取的数据,get_message() 将读取它、格式化消息并将其返回或将其传递给消息处理程序。如果没有数据可读,get_message()将立即返回None。这使得集成到应用程序内现有的事件循环变得很简单。
旧版本的 redis-py 只能使用 pubsub.listen() 读取消息。 Listen() 是一个生成器,它会阻塞直到有消息可用。如果您的应用程序除了接收并处理从 Redis 收到的消息之外不需要执行任何其他操作,那么 Listen() 是启动运行的简单方法。
第三个选项在单独的线程中运行事件循环。 pubsub.run_in_thread() 创建一个新线程并启动事件循环。线程对象返回给 run_in_thread() 的调用者。调用者可以使用 thread.stop() 方法关闭事件循环和线程。在幕后,这只是 get_message() 的一个包装器,它在单独的线程中运行,本质上为您创建一个微小的非阻塞事件循环。 run_in_thread() 采用可选的 sleep_time 参数。如果指定,事件循环将使用循环每次迭代中的值调用 time.sleep() 。
注意:由于我们在单独的线程中运行,因此无法处理未使用注册消息处理程序自动处理的消息。因此,如果您订阅了没有附加消息处理程序的模式或通道,redis-py 会阻止您调用 run_in_thread() 。
因此,要回答您的问题,当您想知道消息是否已到达时,只需检查 get_message 即可。
Accepted answer is obsolete as redis-py recommends you to use the non-blocking
get_message()
. But it also provides a way to easily use threads.https://pypi.python.org/pypi/redis
There are three different strategies for reading messages.
Behind the scenes, get_message() uses the system’s ‘select’ module to quickly poll the connection’s socket. If there’s data available to be read, get_message() will read it, format the message and return it or pass it to a message handler. If there’s no data to be read, get_message() will immediately return None. This makes it trivial to integrate into an existing event loop inside your application.
Older versions of redis-py only read messages with pubsub.listen(). listen() is a generator that blocks until a message is available. If your application doesn’t need to do anything else but receive and act on messages received from redis, listen() is an easy way to get up an running.
The third option runs an event loop in a separate thread. pubsub.run_in_thread() creates a new thread and starts the event loop. The thread object is returned to the caller of run_in_thread(). The caller can use the thread.stop() method to shut down the event loop and thread. Behind the scenes, this is simply a wrapper around get_message() that runs in a separate thread, essentially creating a tiny non-blocking event loop for you. run_in_thread() takes an optional sleep_time argument. If specified, the event loop will call time.sleep() with the value in each iteration of the loop.
Note: Since we’re running in a separate thread, there’s no way to handle messages that aren’t automatically handled with registered message handlers. Therefore, redis-py prevents you from calling run_in_thread() if you’re subscribed to patterns or channels that don’t have message handlers attached.
So to answer you question, just check get_message when you want to know if a message has arrived.
新版本的redis-py已经支持异步pubsub,查看https://github.com/andymccurdy/redis -py 了解更多详细信息。
这是文档本身的示例:
The new version of redis-py has support for asynchronous pubsub, check https://github.com/andymccurdy/redis-py for more details.
Here's an example from the documentation itself:
我认为这是不可能的。通道没有任何“当前数据”,您订阅通道并开始接收通道上其他客户端推送的消息,因此它是一个阻塞 API。另外,如果您查看 Redis 的 pub/sub 的命令文档,它会更清楚。
I don't think that would be possible. A Channel doesn't have any "current data", you subscribe to a channel and start receiving messages that are being pushed by other clients on the channel, hence it is a blocking API. Also if you look at the Redis Commands documentation for pub/sub it would make it more clear.
这是一个线程阻塞侦听器的工作示例。
This is a working example to thread the blocking listener.
这是一个没有线程的非阻塞解决方案:
ps.get_message() 本身就足够了,但我使用此方法,以便我可以等待多个 fd,而不仅仅是 redis 连接。
Here is a nonblocking solution without threads:
ps.get_message()
is enough on its own, but I use this method so that I can wait on multiple fds instead of just the redis connection.要达到无阻塞代码,您必须执行另一种范例代码。这并不难,使用一个新线程来监听所有更改并让主线程去做其他事情。
此外,您还需要某种机制来在主线程和 Redis 订阅者线程之间交换数据。
To reach a none blocking code you must do another kind of paradigm code. It's not hard, using a new thread to listen all changes and leaving main thread to do another things.
Also, you will need some mechanism to interchange data between main thread and redis subscriber thread.
最有效的方法是基于 greenlet 而不是基于线程。作为一个基于greenlet的并发框架,gevent在Python世界中已经相当成熟。因此,gevent 与 redis-py 的集成将是非常棒的。这正是 github 上本期讨论的内容:
https://github.com/andymccurdy /redis-py/issues/310
The most efficient approach would be greenlet-based rather than thread-based. As a greenlet-based concurrency framework, gevent is already quite established in the Python world. A gevent integration with redis-py would be therefore be wonderful. That is exactly what's being discussed in this issue on github:
https://github.com/andymccurdy/redis-py/issues/310
您可以使用 gevent、gevent 猴子修补来构建非阻塞的 redis pubsub 应用程序。
You can use gevent, gevent monkey patching to build a non-blocking redis pubsub app.
Redis 的 pub/sub 向在通道上订阅(侦听)的客户端发送消息。如果您没有在听,您将错过该消息(因此会阻塞呼叫)。如果你想让它成为非阻塞的,我建议使用队列(redis 在这方面也很擅长)。如果您必须使用 pub/sub,您可以按照建议使用 gevent 来拥有异步、阻塞侦听器,将消息推送到队列,并使用单独的使用者以非阻塞方式处理来自该队列的消息。
Redis' pub/sub sends messages to clients subscribed (listening) on a channel. If you are not listening, you will miss the message (hence the blocking call). If you want to have it non-blocking, I recommend using a queue instead (redis is pretty good at that too). If you have to use pub/sub you can use as suggested gevent to have a asynchronous, blocking listener, push messages to a queue and use a separate consumer to process messages from that queue in a non-blocking way.
这非常简单。我们检查消息是否存在,然后继续遍历订阅,直到所有消息都已处理完毕。
This is pretty straightforward. We check if a message exist, and continue go through the subscription until all messages have been processed.