在 bash 中使用命名管道 - 数据丢失问题

发布于 2024-10-04 20:45:40 字数 2662 浏览 9 评论 0原文

在网上进行了一些搜索,找到了使用命名管道的简单“教程”。然而,当我对后台作业执行任何操作时,我似乎丢失了很多数据。

[[编辑:找到了一个更简单的解决方案,请参阅帖子回复。所以我提出的问题现在是学术性的 - 万一有人可能想要一个工作服务器]]

Using Ubuntu 10.04 with Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/ Linux

GNU bash,版本 4.1.5(1)-release (x86_64-pc-linux-gnu)。

我的 bash 函数是:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

我在后台运行它:

> jqs&
[1] 5336

现在我输入它:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

输出不一致。 我经常得不到所有成功的回声。 我收到的新文本回显最多与成功回显一样多,有时甚至更少。

如果我删除“&”从“提要”来看,它似乎有效,但在读取输出之前我被阻止。因此我想让子进程被阻止,而不是主进程。

目的是编写一个简单的作业控制脚本,这样我最多可以并行运行 10 个作业,并将其余作业排队以供稍后处理,但可以可靠地知道它们确实在运行。

下面是完整的作业管理器:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

调用

jq <name> <max processes> <command>
jq abc 2 sleep 20

将启动一个进程。 那部分工作正常。开始第二个吧,好吧。 一个接一个的手工似乎效果很好。 但循环启动 10 似乎会丢失系统,如上面更简单的示例所示。

任何有关我可以采取哪些措施来解决 IPC 数据明显丢失问题的提示将不胜感激。

问候, 阿兰.

Did some search online, found simple 'tutorials' to use named pipes. However when I do anything with background jobs I seem to lose a lot of data.

[[Edit: found a much simpler solution, see reply to post. So the question I put forward is now academic - in case one might want a job server]]

Using Ubuntu 10.04 with Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash, version 4.1.5(1)-release (x86_64-pc-linux-gnu).

My bash function is:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

I run this in the background:

> jqs&
[1] 5336

And now I feed it:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

The output is inconsistent.
I frequently don't get all success echoes.
I get at most as many new text echos as success echoes, sometimes less.

If I remove the '&' from the 'feed', it seems to work, but I am blocked until the output is read. Hence me wanting to let sub-processes get blocked, but not the main process.

The aim being to write a simple job control script so I can run say 10 jobs in parallel at most and queue the rest for later processing, but reliably know that they do run.

Full job manager below:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

Calling

jq <name> <max processes> <command>
jq abc 2 sleep 20

will start one process.
That part works fine. Start a second one, fine.
One by one by hand seem to work fine.
But starting 10 in a loop seems to lose the system, as in the simpler example above.

Any hints as to what I can do to solve this apparent loss of IPC data would be greatly appreciated.

Regards,
Alain.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

橙味迷妹 2024-10-11 20:45:40

您的问题是下面的 if 语句:

while true
do
    if read txt <"$pipe"
    ....
done

发生的情况是您的作业队列服务器每次循环时都会打开和关闭管道。这意味着某些客户端在尝试写入管道时会收到“损坏的管道”错误 - 也就是说,管道的读取器在写入器打开后消失。

要解决此问题,请更改服务器中的循环,在整个循环中打开管道一次:

while true
do
    if read txt
    ....
done < "$pipe"

这样做,管道将打开一次并保持打开状态。

您需要小心循环内运行的内容,因为循环内的所有处理都会将 stdin 附加到命名管道。您需要确保从其他地方重定向循环内所有进程的标准输入,否则它们可能会消耗管道中的数据。

编辑:现在的问题是,当最后一个客户端关闭管道时,您在读取时会收到 EOF,您可以使用 jilles 方法来复制文件描述符,或者您可以确保您也是客户端并保留写入端管道的打开状态:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

这将使管道的写入端在 fd 3 上保持打开状态。与 stdin 一样,此文件描述符也适用相同的警告。您需要关闭它,以便任何子进程都不会继承它。它可能不如标准输入那么重要,但它会更干净。

Your problem is if statement below:

while true
do
    if read txt <"$pipe"
    ....
done

What is happening is that your job queue server is opening and closing the pipe each time around the loop. This means that some of the clients are getting a "broken pipe" error when they try to write to the pipe - that is, the reader of the pipe goes away after the writer opens it.

To fix this, change your loop in the server open the pipe once for the entire loop:

while true
do
    if read txt
    ....
done < "$pipe"

Done this way, the pipe is opened once and kept open.

You will need to be careful of what you run inside the loop, as all processing inside the loop will have stdin attached to the named pipe. You will want to make sure you redirect stdin of all your processes inside the loop from somewhere else, otherwise they may consume the data from the pipe.

Edit: With the problem now being that you are getting EOF on your reads when the last client closes the pipe, you can use jilles method of duping the file descriptors, or you can just make sure you are a client too and keep the write side of the pipe open:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

This will hold the write side of the pipe open on fd 3. The same caveat applies with this file descriptor as with stdin. You will need to close it so any child processes dont inherit it. It probably matters less than with stdin, but it would be cleaner.

自在安然 2024-10-11 20:45:40

正如其他答案中所述,您需要始终保持 fifo 打开以避免丢失数据。

但是,一旦所有写入器在 fifo 打开后都离开(因此存在写入器),读取就会立即返回(并且 poll() 返回 POLLHUP)。清除此状态的唯一方法是重新打开 fifo。

POSIX 没有提供解决方案,但至少 Linux 和 FreeBSD 提供了:如果读取开始失败,请再次打开 fifo,同时保持原始描述符打开。这是可行的,因为在 Linux 和 FreeBSD 中,“hangup”状态对于特定的打开文件描述来说是本地的,而在 POSIX 中,它对于 fifo 来说是全局的。

这可以在 shell 脚本中完成,如下所示:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done

As said in other answers you need to keep the fifo open at all times to avoid losing data.

However, once all writers have left after the fifo has been open (so there was a writer), reads return immediately (and poll() returns POLLHUP). The only way to clear this state is to reopen the fifo.

POSIX does not provide a solution to this but at least Linux and FreeBSD do: if reads start failing, open the fifo again while keeping the original descriptor open. This works because in Linux and FreeBSD the "hangup" state is local to a particular open file description, while in POSIX it is global to the fifo.

This can be done in a shell script like this:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done
天赋异禀 2024-10-11 20:45:40

对于那些可能感兴趣的人,[[重新编辑]]根据 camh 和 jilles 的评论,这里有两个新版本的测试服务器脚本。

现在,两个版本都完全按照预期工作。

camh 的管道管理版本:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

jille 的管道管理版本:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

感谢大家的帮助。

Just for those that might be interested, [[re-edited]] following comments by camh and jilles, here are two new versions of the test server script.

Both versions now works exactly as hoped.

camh's version for pipe management:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

jille's version for pipe management:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

Thanks to all for your help.

旧时光的容颜 2024-10-11 20:45:40

就像 camh &丹尼斯·威廉姆森说不要打破管道。

现在我有更小的例子,直接在命令行上:

服务器:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

客户端:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

可以将关键行替换为:

    (echo "Test-$i" > tst-fifo&);

读取发送到管道的所有客户端数据,尽管使用客户端的选项二可能需要启动服务器读取所有数据之前的时间。

但是,尽管读取等待管道中的数据开始,但一旦数据被推送,它就会永远读取空字符串。

有什么办法可以阻止这个吗?

再次感谢您的任何见解。

Like camh & Dennis Williamson say don't break the pipe.

Now I have smaller examples, direct on the command line:

Server:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

Client:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

Can replace the key line with:

    (echo "Test-$i" > tst-fifo&);

All client data sent to the pipe gets read, though with option two of the client one may need to start the server a couple of times before all data is read.

But although the read waits for data in the pipe to start with, once data has been pushed, it reads the empty string forever.

Any way to stop this?

Thanks for any insights again.

晨光如昨 2024-10-11 20:45:40

一方面,问题比我想象的更严重:
现在,在我更复杂的示例(jq_manage)中似乎有一种情况,即从管道中一遍又一遍地读取相同的数据(即使没有新数据写入其中)。

另一方面,我找到了一个简单的解决方案(根据丹尼斯的评论进行编辑):

function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
}

就像魅力一样。
不涉及套接字或管道。
简单的。

On the one hand the problem is worse than I thought:
Now there seems to be a case in my more complex example (jq_manage) where the same data is being read over and over again from the pipe (even though no new data is being written to it).

On the other hand, I found a simple solution (edited following Dennis' comment):

function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
}

Works like a charm.
No socket or pipe involved.
Simple.

北斗星光 2024-10-11 20:45:40

最多并行运行 10 个作业,并将其余作业排队以供稍后处理,但可靠地知道它们确实在运行

。您可以使用 GNU Parallel 来做到这一点。您不需要此脚本。

http://www.gnu.org/software/parallel/man.html#options

您可以设置 max-procs“作业槽数。并行运行最多 N 个作业。”有一个选项可以设置您要使用的 CPU 核心数。您可以将已执行作业的列表保存到日志文件中,但这是一个测试版功能。

run say 10 jobs in parallel at most and queue the rest for later processing, but reliably know that they do run

You can do this with GNU Parallel. You will not need a this scripting.

http://www.gnu.org/software/parallel/man.html#options

You can set max-procs "Number of jobslots. Run up to N jobs in parallel." There is an option to set the number of CPU cores you want to use. You can save the list of executed jobs to a log file, but that is a beta feature.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文