非阻塞Redis pubsub 可能吗?

发布于 2024-12-11 23:01:37 字数 410 浏览 2 评论 0原文

我想使用redis的pubsub来传输一些消息,但不想使用listen被阻止,就像下面的代码:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']

最后一个for部分将被阻止。我只想检查给定通道是否有数据,我该如何实现这一点?有类似check的方法吗?

I want to use redis' pubsub to transmit some messages, but don't want be blocked using listen, like the code below:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']

The last for section will block. I just want to check if a given channel has data, how can I accomplish this? Is there a check like method?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(11

你的往事 2024-12-18 23:01:37

如果您正在考虑非阻塞、异步处理,您可能正在使用(或应该使用)异步框架/服务器。

更新:
距离最初的答案已经过去 5 年了,与此同时 Python 获得了原生异步 IO 支持。现在有 AIORedis,一个异步 IO Redis 客户端

If you're thinking of non-blocking, asynchronous processing, you're probably using (or should use) asynchronous framework/server.

UPDATE:
It's been 5 years since the original answer, in the mean time Python got native async IO support. There now is AIORedis, an async IO Redis client.

樱桃奶球 2024-12-18 23:01:37

接受的答案已过时,因为 redis-py 建议您使用非阻塞 get_message()。但它也提供了一种轻松使用线程的方法。

https://pypi.python.org/pypi/redis

读取消息有三种不同的策略。

在幕后,get_message() 使用系统的“select”模块快速轮询连接的套接字。如果有可供读取的数据,get_message() 将读取它、格式化消息并将其返回或将其传递给消息处理程序。如果没有数据可读,get_message()将立即返回None。这使得集成到应用程序内现有的事件循环变得很简单。

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

旧版本的 redis-py 只能使用 pubsub.listen() 读取消息。 Listen() 是一个生成器,它会阻塞直到有消息可用。如果您的应用程序除了接收并处理从 Redis 收到的消息之外不需要执行任何其他操作,那么 Listen() 是启动运行的简单方法。

 for message in p.listen():
     # do something with the message

第三个选项在单独的线程中运行事件循环。 pubsub.run_in_thread() 创建一个新线程并启动事件循环。线程对象返回给 run_in_thread() 的调用者。调用者可以使用 thread.stop() 方法关闭事件循环和线程。在幕后,这只是 get_message() 的一个包装器,它在单独的线程中运行,本质上为您创建一个微小的非阻塞事件循环。 run_in_thread() 采用可选的 sleep_time 参数。如果指定,事件循环将使用循环每次迭代中的值调用 time.sleep() 。

注意:由于我们在单独的线程中运行,因此无法处理未使用注册消息处理程序自动处理的消息。因此,如果您订阅了没有附加消息处理程序的模式或通道,redis-py 会阻止您调用 run_in_thread() 。

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()

因此,要回答您的问题,当您想知道消息是否已到达时,只需检查 get_message 即可。

Accepted answer is obsolete as redis-py recommends you to use the non-blocking get_message(). But it also provides a way to easily use threads.

https://pypi.python.org/pypi/redis

There are three different strategies for reading messages.

Behind the scenes, get_message() uses the system’s ‘select’ module to quickly poll the connection’s socket. If there’s data available to be read, get_message() will read it, format the message and return it or pass it to a message handler. If there’s no data to be read, get_message() will immediately return None. This makes it trivial to integrate into an existing event loop inside your application.

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

Older versions of redis-py only read messages with pubsub.listen(). listen() is a generator that blocks until a message is available. If your application doesn’t need to do anything else but receive and act on messages received from redis, listen() is an easy way to get up an running.

 for message in p.listen():
     # do something with the message

The third option runs an event loop in a separate thread. pubsub.run_in_thread() creates a new thread and starts the event loop. The thread object is returned to the caller of run_in_thread(). The caller can use the thread.stop() method to shut down the event loop and thread. Behind the scenes, this is simply a wrapper around get_message() that runs in a separate thread, essentially creating a tiny non-blocking event loop for you. run_in_thread() takes an optional sleep_time argument. If specified, the event loop will call time.sleep() with the value in each iteration of the loop.

Note: Since we’re running in a separate thread, there’s no way to handle messages that aren’t automatically handled with registered message handlers. Therefore, redis-py prevents you from calling run_in_thread() if you’re subscribed to patterns or channels that don’t have message handlers attached.

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()

So to answer you question, just check get_message when you want to know if a message has arrived.

入画浅相思 2024-12-18 23:01:37

新版本的redis-py已经支持异步pubsub,查看https://github.com/andymccurdy/redis -py 了解更多详细信息。
这是文档本身的示例:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)

The new version of redis-py has support for asynchronous pubsub, check https://github.com/andymccurdy/redis-py for more details.
Here's an example from the documentation itself:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)
日久见人心 2024-12-18 23:01:37

我认为这是不可能的。通道没有任何“当前数据”,您订阅通道并开始接收通道上其他客户端推送的消息,因此它是一个阻塞 API。另外,如果您查看 Redis 的 pub/sub 的命令文档,它会更清楚。

I don't think that would be possible. A Channel doesn't have any "current data", you subscribe to a channel and start receiving messages that are being pushed by other clients on the channel, hence it is a blocking API. Also if you look at the Redis Commands documentation for pub/sub it would make it more clear.

柒夜笙歌凉 2024-12-18 23:01:37

这是一个线程阻塞侦听器的工作示例。

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()

This is a working example to thread the blocking listener.

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()
野味少女 2024-12-18 23:01:37

这是一个没有线程的非阻塞解决方案:

fd = ps.connection._sock.fileno();
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block
if rlist:
    for rfd in rlist:
        if fd == rfd:
            message = ps.get_message()

ps.get_message() 本身就足够了,但我使用此方法,以便我可以等待多个 fd,而不仅仅是 redis 连接。

Here is a nonblocking solution without threads:

fd = ps.connection._sock.fileno();
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block
if rlist:
    for rfd in rlist:
        if fd == rfd:
            message = ps.get_message()

ps.get_message() is enough on its own, but I use this method so that I can wait on multiple fds instead of just the redis connection.

半步萧音过轻尘 2024-12-18 23:01:37

要达到无阻塞代码,您必须执行另一种范例代码。这并不难,使用一个新线程来监听所有更改并让主线程去做其他事情。

此外,您还需要某种机制来在主线程和 Redis 订阅者线程之间交换数据。

To reach a none blocking code you must do another kind of paradigm code. It's not hard, using a new thread to listen all changes and leaving main thread to do another things.

Also, you will need some mechanism to interchange data between main thread and redis subscriber thread.

○闲身 2024-12-18 23:01:37

最有效的方法是基于 greenlet 而不是基于线程。作为一个基于greenlet的并发框架,gevent在Python世界中已经相当成熟。因此,gevent 与 redis-py 的集成将是非常棒的。这正是 github 上本期讨论的内容:

https://github.com/andymccurdy /redis-py/issues/310

The most efficient approach would be greenlet-based rather than thread-based. As a greenlet-based concurrency framework, gevent is already quite established in the Python world. A gevent integration with redis-py would be therefore be wonderful. That is exactly what's being discussed in this issue on github:

https://github.com/andymccurdy/redis-py/issues/310

荆棘i 2024-12-18 23:01:37

您可以使用 gevent、gevent 猴子修补来构建非阻塞的 redis pubsub 应用程序。

You can use gevent, gevent monkey patching to build a non-blocking redis pubsub app.

肤浅与狂妄 2024-12-18 23:01:37

Redis 的 pub/sub 向在通道上订阅(侦听)的客户端发送消息。如果您没有在听,您将错过该消息(因此会阻塞呼叫)。如果你想让它成为非阻塞的,我建议使用队列(redis 在这方面也很擅长)。如果您必须使用 pub/sub,您可以按照建议使用 gevent 来拥有异步、阻塞侦听器,将消息推送到队列,并使用单独的使用者以非阻塞方式处理来自该队列的消息。

Redis' pub/sub sends messages to clients subscribed (listening) on a channel. If you are not listening, you will miss the message (hence the blocking call). If you want to have it non-blocking, I recommend using a queue instead (redis is pretty good at that too). If you have to use pub/sub you can use as suggested gevent to have a asynchronous, blocking listener, push messages to a queue and use a separate consumer to process messages from that queue in a non-blocking way.

权谋诡计 2024-12-18 23:01:37

这非常简单。我们检查消息是否存在,然后继续遍历订阅,直到所有消息都已处理完毕。

import redis

r = redis.Redis(decode_responses=True)
subscription = r.pubsub()
subscription.psubscribe('channel')

r.publish('channel', 'foo')
r.publish('channel', 'bar')
r.publish('channel', 'baz')

message = subscription.get_message()
while message is not None:
  if message['data'] != 1:
      # Do something with message
      print(message)
  # Get next message
  message = subscription.get_message()

This is pretty straightforward. We check if a message exist, and continue go through the subscription until all messages have been processed.

import redis

r = redis.Redis(decode_responses=True)
subscription = r.pubsub()
subscription.psubscribe('channel')

r.publish('channel', 'foo')
r.publish('channel', 'bar')
r.publish('channel', 'baz')

message = subscription.get_message()
while message is not None:
  if message['data'] != 1:
      # Do something with message
      print(message)
  # Get next message
  message = subscription.get_message()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文