蟒蛇 & Redis:Manager/Worker 应用程序最佳实践

发布于 2024-11-15 14:27:15 字数 2938 浏览 7 评论 0原文

关于使用 Python 和 Redis 创建用于运行异步命令的作业队列应用程序,我有一些常见问题。这是我到目前为止生成的代码:

def queueCmd(cmd):
    r_server.rpush("cmds", cmd)

def printCmdQueue():
    print r_server.lrange("cmds", 0 , -1)

def work():
    print "command being consumed: ", r_server.lpop("cmds")
    return -1

def boom(info):
    print "pop goes the weasel"

if __name__ == '__main__':

    r_server = redis.Redis("localhost")

    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")

    printCmdQueue()

    pool = Pool(processes=2)

    print "cnt:", +r_server.llen("cmds")
    #while r_server.llen("cmds") > 0:
    while True:
        pool.apply_async(work, callback=boom)
        if not r_server.lrange("cmds", 0, -1):
        #if r_server.llen("cmds") == 0:
            print "Terminate pool"
            pool.terminate()
            break

    printCmdQueue()

首先,我是否正确地相信,如果我需要与经理进行任何通信,我想通过回调来做到这一点?我在此看到的快速示例使用将异步调用存储在结果中并通过 result.get(timeout=1) 访问它。通过通信,我的意思是把东西放回到 Redis 列表中。

编辑:如果命令以异步方式运行,并且我在主程序内的结果上超时,那么是工作线程超时还是管理器内的操作超时?如果经理不能用它来检查工作人员的退出代码就好了?

接下来,此代码会产生以下输出:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
Terminate pool
command being consumed:  None
 pop goes the weasel
pop goes the weasel
pop goes the weasel
[]

为什么工作人员想要一次使用多个 cmd,即使我一次弹出一个命令?同样,这并不总是能很好地结束,有时需要 ctrl+c。为了对付他,我清理了队列并再次前往。我认为这与 apply_sync() 和 if 退出循环有关。我想知道工人方面是否需要做更多事情?

如果我将 if 更改为注释掉的那个,我会得到:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'

这似乎是一种更好的方法来检查是否需要中断,但似乎函数有时返回字符串文字?

任何有关改进这一点的建议将不胜感激。我只是想创建一个管理器,就像 Linux 机器上的服务/守护进程一样。它将用于从 Redis 列表中获取作业(当前是命令,但可能更多)并将结果返回到 Redis 列表中。然后,GUI 将与该管理器交互以获取队列状态并返回结果。

谢谢,

编辑:

我意识到我有点傻了。我不需要从工作人员访问 Redis 服务器,这会导致一些错误(特别是 ValueError)。

为了解决这个问题,循环现在是:

while not r_server.llen("cmds") == 0:
    cmd = r_server.lpop("cmds")
    pool.apply_async(work, [cmd])

在这些行之后,我调用pool.close()。我使用 os.getpid() 和 os.getppid() 来检查我是否确实有多个孩子到处乱跑。

如果这听起来像是创建使用 redis 的管理器/工作人员应用程序的好方法,我仍然会很高兴听到。

I have a few general questions about using Python and Redis to create a job queue application for running asynchronous commands. Here is the code I have generated so far:

def queueCmd(cmd):
    r_server.rpush("cmds", cmd)

def printCmdQueue():
    print r_server.lrange("cmds", 0 , -1)

def work():
    print "command being consumed: ", r_server.lpop("cmds")
    return -1

def boom(info):
    print "pop goes the weasel"

if __name__ == '__main__':

    r_server = redis.Redis("localhost")

    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")

    printCmdQueue()

    pool = Pool(processes=2)

    print "cnt:", +r_server.llen("cmds")
    #while r_server.llen("cmds") > 0:
    while True:
        pool.apply_async(work, callback=boom)
        if not r_server.lrange("cmds", 0, -1):
        #if r_server.llen("cmds") == 0:
            print "Terminate pool"
            pool.terminate()
            break

    printCmdQueue()

First, am I correct in believing that if I need to do any communication to the manager, that I want to do so with a callback? The quick examples I seen on this use store the async call in a result and access it via result.get(timeout=1). And by communication, I mean put stuff back into a redis list.

Edit: if the command is run in async and I timeout on the result inside the main, does that timeout the worker or just that operation inside the manager? If only the manager couldn't I use this to check for exit codes from the worker?

Next, this code produces the following output:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
Terminate pool
command being consumed:  None
 pop goes the weasel
pop goes the weasel
pop goes the weasel
[]

Why does the worker want to consume multiple cmds at a time even though I am poping them off one at a time? On a similar not, this doesn't always end nicely and sometimes requires a ctrl+c. To deal with his I clear out the queue and go again. I think this relates to the apply_sync() and if to get out of loop. I am wondering if more needs to happen on the worker side?

If I change the ifs to the one commented out, I get:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'

This seems like it would be a better way to check to see if I need to break but it seems that function is returning a string literal at times?

Any advise on improving this would be much appreciated. I am simply trying to make a manager which will be like a service/daemon on a linux machine. It will be used to get jobs (currently commands but possibly more) from a redis list and returns results back into a redis list. Then down the road a GUI will interact with this manager to get status of queues and return results.

Thanks,

EDIT:

I realized I was being a bit of a goof. I do not need to access the redis server from a worker and that was leading to some errors (specifically the ValueError).

To fix this the loop is now:

while not r_server.llen("cmds") == 0:
    cmd = r_server.lpop("cmds")
    pool.apply_async(work, [cmd])

After these lines I call pool.close(). I used os.getpid() and os.getppid() to check that I did in fact have multiple children running around.

I would still enjoy hearing if this sounds like a good way to create a manager/worker application that uses redis.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

死开点丶别碍眼 2024-11-22 14:27:15

您的问题是您尝试使用单个 redis 连接同时运行多个命令。

您期待类似的结果

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
command      
             LLEN test
             0

,但您得到的

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
             LLEN test
             command
0

结果以相同的顺序返回,但没有任何内容将线程或命令链接到特定结果。单个 Redis 连接不是线程安全的 - 每个工作线程都需要一个。

如果您不恰当地使用管道,您也会看到类似的问题 - 它是为只写场景而设计的,例如向列表中添加大量项目,您可以通过假设 LPUSH 成功而不是等待服务器在每次之后告诉您它成功来提高性能物品。 Redis 仍然会返回结果,但它们不一定是最后发送的命令的结果。

除此之外,基本方法是合理的。不过,您可以进行一些增强:

  • 不检查长度,只需使用非阻塞 LPOP - 如果返回 null,则列表为空
  • 添加一个计时器,以便如果列表为空,它将等待,而不仅仅是发出另一个命令。
  • 在 while 循环条件中包含取消检查
  • 处理连接错误 - 我使用设置的外部循环,以便如果连接失败,工作人员将尝试重新连接(基本上重新启动ma​​in)进行合理的尝试次数在完全终止工作进程之前。

Your problem is that you are trying to run multiple commands concurrently with a single redis connection.

You are expecting something like

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
command      
             LLEN test
             0

but you are getting

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
             LLEN test
             command
0

The results come back in the same order, but there is nothing linking a thread or command to a specific result. Individual redis connections are not thread safe - you will need one for each worker thread.

You can also see similar problems if you use pipelining inappropriately - it is designed for write only scenarios like adding lots of items to a list, where you can improve performance by assuming LPUSH succeeded rather than waiting for the server to tell you it succeeded after each item. Redis will still return the results, but they will not necessarily be results from the last command sent.

Other than that, the basic approach is reasonable. There are a couple of enhancements you could make though:

  • Rather than checking the length, just use non-blocking LPOP - if it returns null, the list is empty
  • Add a timer so that if the list is empty it will wait rather than just issuing another command.
  • Include a cancellation check in the while loop condition
  • Handle connection errors - I use an outer loop set up so that if the connection fails the worker will attempt to reconnect (basically restart main) for a reasonable number of attempts before terminating the worker process altogether.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文