在 bash 中使用命名管道 - 数据丢失问题
在网上进行了一些搜索,找到了使用命名管道的简单“教程”。然而,当我对后台作业执行任何操作时,我似乎丢失了很多数据。
[[编辑:找到了一个更简单的解决方案,请参阅帖子回复。所以我提出的问题现在是学术性的 - 万一有人可能想要一个工作服务器]]
Using Ubuntu 10.04 with Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/ Linux
GNU bash,版本 4.1.5(1)-release (x86_64-pc-linux-gnu)。
我的 bash 函数是:
function jqs
{
pipe=/tmp/__job_control_manager__
trap "rm -f $pipe; exit" EXIT SIGKILL
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
while true
do
if read txt <"$pipe"
then
echo "$(date +'%Y'): new text is [[$txt]]"
if [[ "$txt" == 'quit' ]]
then
break
fi
fi
done
}
我在后台运行它:
> jqs&
[1] 5336
现在我输入它:
for i in 1 2 3 4 5 6 7 8
do
(echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done
输出不一致。 我经常得不到所有成功的回声。 我收到的新文本回显最多与成功回显一样多,有时甚至更少。
如果我删除“&”从“提要”来看,它似乎有效,但在读取输出之前我被阻止。因此我想让子进程被阻止,而不是主进程。
目的是编写一个简单的作业控制脚本,这样我最多可以并行运行 10 个作业,并将其余作业排队以供稍后处理,但可以可靠地知道它们确实在运行。
下面是完整的作业管理器:
function jq_manage
{
export __gn__="$1"
pipe=/tmp/__job_control_manager_"$__gn__"__
trap "rm -f $pipe" EXIT
trap "break" SIGKILL
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
while true
do
date
jobs
if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
then
echo "Waiting for new job"
if read new_job <"$pipe"
then
echo "new job is [[$new_job]]"
if [[ "$new_job" == 'quit' ]]
then
break
fi
echo "In group $__gn__, starting job $new_job"
eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
fi
else
sleep 3
fi
done
}
function jq
{
# __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
# __jN__ = second parameter to this function, the maximum of job numbers to run concurrently
export __gn__="$1"
shift
export __jN__="$1"
shift
export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
if (($__jq__ '<' 1))
then
eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
fi
pipe=/tmp/__job_control_manager_"$__gn__"__
echo $@ >$pipe
}
调用
jq <name> <max processes> <command>
jq abc 2 sleep 20
将启动一个进程。 那部分工作正常。开始第二个吧,好吧。 一个接一个的手工似乎效果很好。 但循环启动 10 似乎会丢失系统,如上面更简单的示例所示。
任何有关我可以采取哪些措施来解决 IPC 数据明显丢失问题的提示将不胜感激。
问候, 阿兰.
Did some search online, found simple 'tutorials' to use named pipes. However when I do anything with background jobs I seem to lose a lot of data.
[[Edit: found a much simpler solution, see reply to post. So the question I put forward is now academic - in case one might want a job server]]
Using Ubuntu 10.04 with Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux
GNU bash, version 4.1.5(1)-release (x86_64-pc-linux-gnu).
My bash function is:
function jqs
{
pipe=/tmp/__job_control_manager__
trap "rm -f $pipe; exit" EXIT SIGKILL
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
while true
do
if read txt <"$pipe"
then
echo "$(date +'%Y'): new text is [[$txt]]"
if [[ "$txt" == 'quit' ]]
then
break
fi
fi
done
}
I run this in the background:
> jqs&
[1] 5336
And now I feed it:
for i in 1 2 3 4 5 6 7 8
do
(echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done
The output is inconsistent.
I frequently don't get all success echoes.
I get at most as many new text echos as success echoes, sometimes less.
If I remove the '&' from the 'feed', it seems to work, but I am blocked until the output is read. Hence me wanting to let sub-processes get blocked, but not the main process.
The aim being to write a simple job control script so I can run say 10 jobs in parallel at most and queue the rest for later processing, but reliably know that they do run.
Full job manager below:
function jq_manage
{
export __gn__="$1"
pipe=/tmp/__job_control_manager_"$__gn__"__
trap "rm -f $pipe" EXIT
trap "break" SIGKILL
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
while true
do
date
jobs
if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
then
echo "Waiting for new job"
if read new_job <"$pipe"
then
echo "new job is [[$new_job]]"
if [[ "$new_job" == 'quit' ]]
then
break
fi
echo "In group $__gn__, starting job $new_job"
eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
fi
else
sleep 3
fi
done
}
function jq
{
# __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
# __jN__ = second parameter to this function, the maximum of job numbers to run concurrently
export __gn__="$1"
shift
export __jN__="$1"
shift
export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
if (($__jq__ '<' 1))
then
eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
fi
pipe=/tmp/__job_control_manager_"$__gn__"__
echo $@ >$pipe
}
Calling
jq <name> <max processes> <command>
jq abc 2 sleep 20
will start one process.
That part works fine. Start a second one, fine.
One by one by hand seem to work fine.
But starting 10 in a loop seems to lose the system, as in the simpler example above.
Any hints as to what I can do to solve this apparent loss of IPC data would be greatly appreciated.
Regards,
Alain.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(6)
您的问题是下面的 if 语句:
发生的情况是您的作业队列服务器每次循环时都会打开和关闭管道。这意味着某些客户端在尝试写入管道时会收到“损坏的管道”错误 - 也就是说,管道的读取器在写入器打开后消失。
要解决此问题,请更改服务器中的循环,在整个循环中打开管道一次:
这样做,管道将打开一次并保持打开状态。
您需要小心循环内运行的内容,因为循环内的所有处理都会将 stdin 附加到命名管道。您需要确保从其他地方重定向循环内所有进程的标准输入,否则它们可能会消耗管道中的数据。
编辑:现在的问题是,当最后一个客户端关闭管道时,您在读取时会收到 EOF,您可以使用 jilles 方法来复制文件描述符,或者您可以确保您也是客户端并保留写入端管道的打开状态:
这将使管道的写入端在 fd 3 上保持打开状态。与 stdin 一样,此文件描述符也适用相同的警告。您需要关闭它,以便任何子进程都不会继承它。它可能不如标准输入那么重要,但它会更干净。
Your problem is
if
statement below:What is happening is that your job queue server is opening and closing the pipe each time around the loop. This means that some of the clients are getting a "broken pipe" error when they try to write to the pipe - that is, the reader of the pipe goes away after the writer opens it.
To fix this, change your loop in the server open the pipe once for the entire loop:
Done this way, the pipe is opened once and kept open.
You will need to be careful of what you run inside the loop, as all processing inside the loop will have stdin attached to the named pipe. You will want to make sure you redirect stdin of all your processes inside the loop from somewhere else, otherwise they may consume the data from the pipe.
Edit: With the problem now being that you are getting EOF on your reads when the last client closes the pipe, you can use jilles method of duping the file descriptors, or you can just make sure you are a client too and keep the write side of the pipe open:
This will hold the write side of the pipe open on fd 3. The same caveat applies with this file descriptor as with stdin. You will need to close it so any child processes dont inherit it. It probably matters less than with stdin, but it would be cleaner.
正如其他答案中所述,您需要始终保持 fifo 打开以避免丢失数据。
但是,一旦所有写入器在 fifo 打开后都离开(因此存在写入器),读取就会立即返回(并且
poll()
返回POLLHUP
)。清除此状态的唯一方法是重新打开 fifo。POSIX 没有提供解决方案,但至少 Linux 和 FreeBSD 提供了:如果读取开始失败,请再次打开 fifo,同时保持原始描述符打开。这是可行的,因为在 Linux 和 FreeBSD 中,“hangup”状态对于特定的打开文件描述来说是本地的,而在 POSIX 中,它对于 fifo 来说是全局的。
这可以在 shell 脚本中完成,如下所示:
As said in other answers you need to keep the fifo open at all times to avoid losing data.
However, once all writers have left after the fifo has been open (so there was a writer), reads return immediately (and
poll()
returnsPOLLHUP
). The only way to clear this state is to reopen the fifo.POSIX does not provide a solution to this but at least Linux and FreeBSD do: if reads start failing, open the fifo again while keeping the original descriptor open. This works because in Linux and FreeBSD the "hangup" state is local to a particular open file description, while in POSIX it is global to the fifo.
This can be done in a shell script like this:
对于那些可能感兴趣的人,[[重新编辑]]根据 camh 和 jilles 的评论,这里有两个新版本的测试服务器脚本。
现在,两个版本都完全按照预期工作。
camh 的管道管理版本:
jille 的管道管理版本:
感谢大家的帮助。
Just for those that might be interested, [[re-edited]] following comments by camh and jilles, here are two new versions of the test server script.
Both versions now works exactly as hoped.
camh's version for pipe management:
jille's version for pipe management:
Thanks to all for your help.
就像 camh &丹尼斯·威廉姆森说不要打破管道。
现在我有更小的例子,直接在命令行上:
服务器:
客户端:
可以将关键行替换为:
读取发送到管道的所有客户端数据,尽管使用客户端的选项二可能需要启动服务器读取所有数据之前的时间。
但是,尽管读取等待管道中的数据开始,但一旦数据被推送,它就会永远读取空字符串。
有什么办法可以阻止这个吗?
再次感谢您的任何见解。
Like camh & Dennis Williamson say don't break the pipe.
Now I have smaller examples, direct on the command line:
Server:
Client:
Can replace the key line with:
All client data sent to the pipe gets read, though with option two of the client one may need to start the server a couple of times before all data is read.
But although the read waits for data in the pipe to start with, once data has been pushed, it reads the empty string forever.
Any way to stop this?
Thanks for any insights again.
一方面,问题比我想象的更严重:
现在,在我更复杂的示例(jq_manage)中似乎有一种情况,即从管道中一遍又一遍地读取相同的数据(即使没有新数据写入其中)。
另一方面,我找到了一个简单的解决方案(根据丹尼斯的评论进行编辑):
就像魅力一样。
不涉及套接字或管道。
简单的。
On the one hand the problem is worse than I thought:
Now there seems to be a case in my more complex example (jq_manage) where the same data is being read over and over again from the pipe (even though no new data is being written to it).
On the other hand, I found a simple solution (edited following Dennis' comment):
Works like a charm.
No socket or pipe involved.
Simple.
。您可以使用 GNU Parallel 来做到这一点。您不需要此脚本。
http://www.gnu.org/software/parallel/man.html#options
您可以设置 max-procs“作业槽数。并行运行最多 N 个作业。”有一个选项可以设置您要使用的 CPU 核心数。您可以将已执行作业的列表保存到日志文件中,但这是一个测试版功能。
You can do this with GNU Parallel. You will not need a this scripting.
http://www.gnu.org/software/parallel/man.html#options
You can set max-procs "Number of jobslots. Run up to N jobs in parallel." There is an option to set the number of CPU cores you want to use. You can save the list of executed jobs to a log file, but that is a beta feature.