蟒蛇 & Redis:Manager/Worker 应用程序最佳实践
关于使用 Python 和 Redis 创建用于运行异步命令的作业队列应用程序,我有一些常见问题。这是我到目前为止生成的代码:
def queueCmd(cmd):
r_server.rpush("cmds", cmd)
def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)
def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1
def boom(info):
print "pop goes the weasel"
if __name__ == '__main__':
r_server = redis.Redis("localhost")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
printCmdQueue()
pool = Pool(processes=2)
print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break
printCmdQueue()
首先,我是否正确地相信,如果我需要与经理进行任何通信,我想通过回调来做到这一点?我在此看到的快速示例使用将异步调用存储在结果中并通过 result.get(timeout=1) 访问它。通过通信,我的意思是把东西放回到 Redis 列表中。
编辑:如果命令以异步方式运行,并且我在主程序内的结果上超时,那么是工作线程超时还是管理器内的操作超时?如果经理不能用它来检查工作人员的退出代码就好了?
接下来,此代码会产生以下输出:
['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]
为什么工作人员想要一次使用多个 cmd,即使我一次弹出一个命令?同样,这并不总是能很好地结束,有时需要 ctrl+c。为了对付他,我清理了队列并再次前往。我认为这与 apply_sync() 和 if 退出循环有关。我想知道工人方面是否需要做更多事情?
如果我将 if 更改为注释掉的那个,我会得到:
ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'
这似乎是一种更好的方法来检查是否需要中断,但似乎函数有时返回字符串文字?
任何有关改进这一点的建议将不胜感激。我只是想创建一个管理器,就像 Linux 机器上的服务/守护进程一样。它将用于从 Redis 列表中获取作业(当前是命令,但可能更多)并将结果返回到 Redis 列表中。然后,GUI 将与该管理器交互以获取队列状态并返回结果。
谢谢,
编辑:
我意识到我有点傻了。我不需要从工作人员访问 Redis 服务器,这会导致一些错误(特别是 ValueError)。
为了解决这个问题,循环现在是:
while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])
在这些行之后,我调用pool.close()。我使用 os.getpid() 和 os.getppid() 来检查我是否确实有多个孩子到处乱跑。
如果这听起来像是创建使用 redis 的管理器/工作人员应用程序的好方法,我仍然会很高兴听到。
I have a few general questions about using Python and Redis to create a job queue application for running asynchronous commands. Here is the code I have generated so far:
def queueCmd(cmd):
r_server.rpush("cmds", cmd)
def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)
def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1
def boom(info):
print "pop goes the weasel"
if __name__ == '__main__':
r_server = redis.Redis("localhost")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
printCmdQueue()
pool = Pool(processes=2)
print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break
printCmdQueue()
First, am I correct in believing that if I need to do any communication to the manager, that I want to do so with a callback? The quick examples I seen on this use store the async call in a result and access it via result.get(timeout=1). And by communication, I mean put stuff back into a redis list.
Edit: if the command is run in async and I timeout on the result inside the main, does that timeout the worker or just that operation inside the manager? If only the manager couldn't I use this to check for exit codes from the worker?
Next, this code produces the following output:
['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]
Why does the worker want to consume multiple cmds at a time even though I am poping them off one at a time? On a similar not, this doesn't always end nicely and sometimes requires a ctrl+c. To deal with his I clear out the queue and go again. I think this relates to the apply_sync() and if to get out of loop. I am wondering if more needs to happen on the worker side?
If I change the ifs to the one commented out, I get:
ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'
This seems like it would be a better way to check to see if I need to break but it seems that function is returning a string literal at times?
Any advise on improving this would be much appreciated. I am simply trying to make a manager which will be like a service/daemon on a linux machine. It will be used to get jobs (currently commands but possibly more) from a redis list and returns results back into a redis list. Then down the road a GUI will interact with this manager to get status of queues and return results.
Thanks,
EDIT:
I realized I was being a bit of a goof. I do not need to access the redis server from a worker and that was leading to some errors (specifically the ValueError).
To fix this the loop is now:
while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])
After these lines I call pool.close()
. I used os.getpid()
and os.getppid()
to check that I did in fact have multiple children running around.
I would still enjoy hearing if this sounds like a good way to create a manager/worker application that uses redis.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您的问题是您尝试使用单个 redis 连接同时运行多个命令。
您期待类似的结果
,但您得到的
结果以相同的顺序返回,但没有任何内容将线程或命令链接到特定结果。单个 Redis 连接不是线程安全的 - 每个工作线程都需要一个。
如果您不恰当地使用管道,您也会看到类似的问题 - 它是为只写场景而设计的,例如向列表中添加大量项目,您可以通过假设 LPUSH 成功而不是等待服务器在每次之后告诉您它成功来提高性能物品。 Redis 仍然会返回结果,但它们不一定是最后发送的命令的结果。
除此之外,基本方法是合理的。不过,您可以进行一些增强:
Your problem is that you are trying to run multiple commands concurrently with a single redis connection.
You are expecting something like
but you are getting
The results come back in the same order, but there is nothing linking a thread or command to a specific result. Individual redis connections are not thread safe - you will need one for each worker thread.
You can also see similar problems if you use pipelining inappropriately - it is designed for write only scenarios like adding lots of items to a list, where you can improve performance by assuming LPUSH succeeded rather than waiting for the server to tell you it succeeded after each item. Redis will still return the results, but they will not necessarily be results from the last command sent.
Other than that, the basic approach is reasonable. There are a couple of enhancements you could make though: