如何编写进程池 bash shell

发布于 2024-11-16 19:12:08 字数 155 浏览 5 评论 0原文

我有超过10个任务要执行,系统限制最多可以同时运行4个任务。

我的任务可以像这样开始: myprog taskname

如何编写 bash shell 脚本来运行这些任务。最重要的是,当一个任务完成后,脚本可以立即启动另一个任务,使正在运行的任务数始终保持为 4。

I have more than 10 tasks to execute, and the system restrict that there at most 4 tasks can run at the same time.

My task can be started like:
myprog taskname

How can I write a bash shell script to run these task. The most important thing is that when one task finish, the script can start another immediately, making the running tasks count remain 4 all the time.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(14

甜妞爱困 2024-11-23 19:12:08

使用 xargs:

xargs -P <maximum-number-of-process-at-a-time> -n <arguments-per-process> <command>

详细信息

Use xargs:

xargs -P <maximum-number-of-process-at-a-time> -n <arguments-per-process> <command>

Details here.

⊕婉儿 2024-11-23 19:12:08

我在考虑编写自己的进程池时偶然发现了这个线程,并且特别喜欢 Brandon Horsley 的解决方案,尽管我无法使信号正常工作,因此我从 Apache 中获得了灵感,并决定尝试使用 fifo 的预分叉模型:我的工作队列。

以下函数是工作进程在分叉时运行的函数。

# \brief the worker function that is called when we fork off worker processes
# \param[in] id  the worker ID
# \param[in] job_queue  the fifo to read jobs from
# \param[in] result_log  the temporary log file to write exit codes to
function _job_pool_worker()
{
    local id=$1
    local job_queue=$2
    local result_log=$3
    local line=

    exec 7<> ${job_queue}
    while [[ "${line}" != "${job_pool_end_of_jobs}" && -e "${job_queue}" ]]; do
        # workers block on the exclusive lock to read the job queue
        flock --exclusive 7
        read line <${job_queue}
        flock --unlock 7
        # the worker should exit if it sees the end-of-job marker or run the
        # job otherwise and save its exit code to the result log.
        if [[ "${line}" == "${job_pool_end_of_jobs}" ]]; then
            # write it one more time for the next sibling so that everyone
            # will know we are exiting.
            echo "${line}" >&7
        else
            _job_pool_echo "### _job_pool_worker-${id}: ${line}"
            # run the job
            { ${line} ; } 
            # now check the exit code and prepend "ERROR" to the result log entry
            # which we will use to count errors and then strip out later.
            local result=$?
            local status=
            if [[ "${result}" != "0" ]]; then
                status=ERROR
            fi  
            # now write the error to the log, making sure multiple processes
            # don't trample over each other.
            exec 8<> ${result_log}
            flock --exclusive 8
            echo "${status}job_pool: exited ${result}: ${line}" >> ${result_log}
            flock --unlock 8
            exec 8>&-
            _job_pool_echo "### _job_pool_worker-${id}: exited ${result}: ${line}"
        fi  
    done
    exec 7>&-
}

您可以在 Github 上获取我的解决方案的副本。这是使用我的实现的示例程序。

#!/bin/bash

. job_pool.sh

function foobar()
{
    # do something
    true
}   

# initialize the job pool to allow 3 parallel jobs and echo commands
job_pool_init 3 0

# run jobs
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run sleep 3
job_pool_run foobar
job_pool_run foobar
job_pool_run /bin/false

# wait until all jobs complete before continuing
job_pool_wait

# more jobs
job_pool_run /bin/false
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run foobar

# don't forget to shut down the job pool
job_pool_shutdown

# check the $job_pool_nerrors for the number of jobs that exited non-zero
echo "job_pool_nerrors: ${job_pool_nerrors}"

希望这有帮助!

I chanced upon this thread while looking into writing my own process pool and particularly liked Brandon Horsley's solution, though I couldn't get the signals working right, so I took inspiration from Apache and decided to try a pre-fork model with a fifo as my job queue.

The following function is the function that the worker processes run when forked.

# \brief the worker function that is called when we fork off worker processes
# \param[in] id  the worker ID
# \param[in] job_queue  the fifo to read jobs from
# \param[in] result_log  the temporary log file to write exit codes to
function _job_pool_worker()
{
    local id=$1
    local job_queue=$2
    local result_log=$3
    local line=

    exec 7<> ${job_queue}
    while [[ "${line}" != "${job_pool_end_of_jobs}" && -e "${job_queue}" ]]; do
        # workers block on the exclusive lock to read the job queue
        flock --exclusive 7
        read line <${job_queue}
        flock --unlock 7
        # the worker should exit if it sees the end-of-job marker or run the
        # job otherwise and save its exit code to the result log.
        if [[ "${line}" == "${job_pool_end_of_jobs}" ]]; then
            # write it one more time for the next sibling so that everyone
            # will know we are exiting.
            echo "${line}" >&7
        else
            _job_pool_echo "### _job_pool_worker-${id}: ${line}"
            # run the job
            { ${line} ; } 
            # now check the exit code and prepend "ERROR" to the result log entry
            # which we will use to count errors and then strip out later.
            local result=$?
            local status=
            if [[ "${result}" != "0" ]]; then
                status=ERROR
            fi  
            # now write the error to the log, making sure multiple processes
            # don't trample over each other.
            exec 8<> ${result_log}
            flock --exclusive 8
            echo "${status}job_pool: exited ${result}: ${line}" >> ${result_log}
            flock --unlock 8
            exec 8>&-
            _job_pool_echo "### _job_pool_worker-${id}: exited ${result}: ${line}"
        fi  
    done
    exec 7>&-
}

You can get a copy of my solution at Github. Here's a sample program using my implementation.

#!/bin/bash

. job_pool.sh

function foobar()
{
    # do something
    true
}   

# initialize the job pool to allow 3 parallel jobs and echo commands
job_pool_init 3 0

# run jobs
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run sleep 3
job_pool_run foobar
job_pool_run foobar
job_pool_run /bin/false

# wait until all jobs complete before continuing
job_pool_wait

# more jobs
job_pool_run /bin/false
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run foobar

# don't forget to shut down the job pool
job_pool_shutdown

# check the $job_pool_nerrors for the number of jobs that exited non-zero
echo "job_pool_nerrors: ${job_pool_nerrors}"

Hope this helps!

春风十里 2024-11-23 19:12:08

使用 GNU Parallel 你可以这样做:

cat tasks | parallel -j4 myprog

如果你有 4 个核心,你甚至可以这样做:

cat tasks | parallel myprog

来自 http://git.savannah.gnu.org/cgit/parallel.git/tree/README

完全安装

GNU Parallel 的完全安装就像这样简单:

./configure && make && make install

个人安装

如果您不是root 您可以将 ~/bin 添加到您的路径并安装在
~/bin 和 ~/share:

./configure --prefix=$HOME && make && make install

或者如果你的系统缺少“make”,你可以简单地复制 src/parallel
src/sem src/niceload src/sql 到路径中的目录。

最小化安装

如果您只需要并行并且没有安装“make”(可能是
系统是旧的或 Microsoft Windows):

wget http://git.savannah.gnu.org/cgit/parallel.git/plain/src/parallel
chmod 755 parallel
cp parallel sem
mv parallel sem dir-in-your-$PATH/bin/

测试安装

此后您应该能够执行以下操作:

parallel -j0 ping -nc 3 ::: foss.org.my gnu.org freenetproject.org

这将并行发送 3 个 ping 数据包到 3 个不同的主机并打印
完成时的输出。

观看介绍视频以进行快速介绍:
https://www.youtube.com/playlist?list=PL284C9FF2488BC6D1

Using GNU Parallel you can do:

cat tasks | parallel -j4 myprog

If you have 4 cores, you can even just do:

cat tasks | parallel myprog

From http://git.savannah.gnu.org/cgit/parallel.git/tree/README:

Full installation

Full installation of GNU Parallel is as simple as:

./configure && make && make install

Personal installation

If you are not root you can add ~/bin to your path and install in
~/bin and ~/share:

./configure --prefix=$HOME && make && make install

Or if your system lacks 'make' you can simply copy src/parallel
src/sem src/niceload src/sql to a dir in your path.

Minimal installation

If you just need parallel and do not have 'make' installed (maybe the
system is old or Microsoft Windows):

wget http://git.savannah.gnu.org/cgit/parallel.git/plain/src/parallel
chmod 755 parallel
cp parallel sem
mv parallel sem dir-in-your-$PATH/bin/

Test the installation

After this you should be able to do:

parallel -j0 ping -nc 3 ::: foss.org.my gnu.org freenetproject.org

This will send 3 ping packets to 3 different hosts in parallel and print
the output when they complete.

Watch the intro video for a quick introduction:
https://www.youtube.com/playlist?list=PL284C9FF2488BC6D1

绿光 2024-11-23 19:12:08

我使用内置功能找到了 A Foo Walks into a Bar... 博客 中提出的最佳解决方案众所周知的xargs工具
首先创建一个文件 commands.txt ,其中包含要执行的命令列表

myprog taskname1
myprog taskname2
myprog taskname3
myprog taskname4
...
myprog taskname123

,然后将其通过管道传输到 xargs,以便在 4 个进程池中执行:

cat commands.txt | xargs -I CMD --max-procs=4 bash -c CMD

您可以修改进程号

I found the best solution proposed in A Foo Walks into a Bar... blog using build-in functionality of well know xargs tool
First create a file commands.txt with list of commands you want to execute

myprog taskname1
myprog taskname2
myprog taskname3
myprog taskname4
...
myprog taskname123

and then pipe it to xargs like this to execute in 4 processes pool:

cat commands.txt | xargs -I CMD --max-procs=4 bash -c CMD

you can modify no of process

佼人 2024-11-23 19:12:08

我建议编写四个脚本,每个脚本都串行执行一定数量的任务。然后编写另一个脚本来并行启动这四个脚本。例如,如果您有脚本 script1.sh、script2.sh、script3.sh 和 script4.sh,则可以有一个名为 headscript.sh 的脚本,如下所示。

#!/bin/sh
./script1.sh & 
./script2.sh & 
./script3.sh & 
./script4.sh &

I would suggest writing four scripts, each one of which executes a certain number of tasks in series. Then write another script that starts the four scripts in parallel. For instance, if you have scripts, script1.sh, script2.sh, script3.sh, and script4.sh, you could have a script called headscript.sh like so.

#!/bin/sh
./script1.sh & 
./script2.sh & 
./script3.sh & 
./script4.sh &
温柔戏命师 2024-11-23 19:12:08

按照 @Parag Sardas' 的回答和此处链接的文档,您可能需要在 .bash_aliases 上添加一个快速脚本

重新链接文档链接,因为它值得一读

#!/bin/bash
# https://stackoverflow.com/a/19618159
# https://stackoverflow.com/a/51861820
#
# Example file contents:
# touch /tmp/a.txt
# touch /tmp/b.txt

if [ "$#" -eq 0 ];  then
  echo "$0 <file> [max-procs=0]"
  exit 1
fi

FILE=${1}
MAX_PROCS=${2:-0}
cat $FILE | while read line; do printf "%q\n" "$line"; done | xargs --max-procs=$MAX_PROCS -I CMD bash -c CMD


./xargs-parallel.sh jobs.txt 4 最多 4 个进程从 jobs.txt 读取

Following @Parag Sardas' answer and the documentation linked here's a quick script you might want to add on your .bash_aliases.

Relinking the doc link because it's worth a read

#!/bin/bash
# https://stackoverflow.com/a/19618159
# https://stackoverflow.com/a/51861820
#
# Example file contents:
# touch /tmp/a.txt
# touch /tmp/b.txt

if [ "$#" -eq 0 ];  then
  echo "$0 <file> [max-procs=0]"
  exit 1
fi

FILE=${1}
MAX_PROCS=${2:-0}
cat $FILE | while read line; do printf "%q\n" "$line"; done | xargs --max-procs=$MAX_PROCS -I CMD bash -c CMD

I.e.
./xargs-parallel.sh jobs.txt 4 maximum of 4 processes read from jobs.txt

缪败 2024-11-23 19:12:08

你也许可以用信号做一些聪明的事情。

请注意,这只是为了说明概念,因此尚未经过彻底测试。

#!/usr/local/bin/bash

this_pid="$"
jobs_running=0
sleep_pid=

# Catch alarm signals to adjust the number of running jobs
trap 'decrement_jobs' SIGALRM

# When a job finishes, decrement the total and kill the sleep process
decrement_jobs()
{
  jobs_running=$(($jobs_running - 1))
  if [ -n "${sleep_pid}" ]
  then
    kill -s SIGKILL "${sleep_pid}"
    sleep_pid=
  fi
}

# Check to see if the max jobs are running, if so sleep until woken
launch_task()
{
  if [ ${jobs_running} -gt 3 ]
  then
    (
      while true
      do
        sleep 999
      done
    ) &
    sleep_pid=$!
    wait ${sleep_pid}
  fi

  # Launch the requested task, signalling the parent upon completion
  (
    "$@"
    kill -s SIGALRM "${this_pid}"
  ) &
  jobs_running=$((${jobs_running} + 1))
}

# Launch all of the tasks, this can be in a loop, etc.
launch_task task1
launch_task tast2
...
launch_task task99

You could probably do something clever with signals.

Note this is only to illustrate the concept, and thus not thoroughly tested.

#!/usr/local/bin/bash

this_pid="$"
jobs_running=0
sleep_pid=

# Catch alarm signals to adjust the number of running jobs
trap 'decrement_jobs' SIGALRM

# When a job finishes, decrement the total and kill the sleep process
decrement_jobs()
{
  jobs_running=$(($jobs_running - 1))
  if [ -n "${sleep_pid}" ]
  then
    kill -s SIGKILL "${sleep_pid}"
    sleep_pid=
  fi
}

# Check to see if the max jobs are running, if so sleep until woken
launch_task()
{
  if [ ${jobs_running} -gt 3 ]
  then
    (
      while true
      do
        sleep 999
      done
    ) &
    sleep_pid=$!
    wait ${sleep_pid}
  fi

  # Launch the requested task, signalling the parent upon completion
  (
    "$@"
    kill -s SIGALRM "${this_pid}"
  ) &
  jobs_running=$((${jobs_running} + 1))
}

# Launch all of the tasks, this can be in a loop, etc.
launch_task task1
launch_task tast2
...
launch_task task99
情栀口红 2024-11-23 19:12:08

这个经过测试的脚本一次运行 5 个作业,并且一旦完成就会重新启动一个新作业(由于当我们收到 SIGCHLD 时睡眠 10.9 被终止。一个更简单的版本可以使用直接轮询(将睡眠 10.9 更改为睡一觉并摆脱陷阱)。

#!/usr/bin/bash

set -o monitor
trap "pkill -P $ -f 'sleep 10\.9' >&/dev/null" SIGCHLD

totaljobs=15
numjobs=5
worktime=10
curjobs=0
declare -A pidlist

dojob()
{
  slot=$1
  time=$(echo "$RANDOM * 10 / 32768" | bc -l)
  echo Starting job $slot with args $time
  sleep $time &
  pidlist[$slot]=`jobs -p %%`
  curjobs=$(($curjobs + 1))
  totaljobs=$(($totaljobs - 1))
}

# start
while [ $curjobs -lt $numjobs -a $totaljobs -gt 0 ]
 do
  dojob $curjobs
 done

# Poll for jobs to die, restarting while we have them
while [ $totaljobs -gt 0 ]
 do
  for ((i=0;$i < $curjobs;i++))
   do
    if ! kill -0 ${pidlist[$i]} >&/dev/null
     then
      dojob $i
      break
     fi
   done
   sleep 10.9 >&/dev/null
 done
wait

This tested script runs 5 jobs at a time and will restart a new job as soon as it does (due to the kill of the sleep 10.9 when we get a SIGCHLD. A simpler version of this could use direct polling (change the sleep 10.9 to sleep 1 and get rid of the trap).

#!/usr/bin/bash

set -o monitor
trap "pkill -P $ -f 'sleep 10\.9' >&/dev/null" SIGCHLD

totaljobs=15
numjobs=5
worktime=10
curjobs=0
declare -A pidlist

dojob()
{
  slot=$1
  time=$(echo "$RANDOM * 10 / 32768" | bc -l)
  echo Starting job $slot with args $time
  sleep $time &
  pidlist[$slot]=`jobs -p %%`
  curjobs=$(($curjobs + 1))
  totaljobs=$(($totaljobs - 1))
}

# start
while [ $curjobs -lt $numjobs -a $totaljobs -gt 0 ]
 do
  dojob $curjobs
 done

# Poll for jobs to die, restarting while we have them
while [ $totaljobs -gt 0 ]
 do
  for ((i=0;$i < $curjobs;i++))
   do
    if ! kill -0 ${pidlist[$i]} >&/dev/null
     then
      dojob $i
      break
     fi
   done
   sleep 10.9 >&/dev/null
 done
wait
记忆消瘦 2024-11-23 19:12:08

关于 4 个 shell 脚本的其他答案并不完全令我满意,因为它假设所有任务大约花费相同的时间,并且因为它需要手动设置。但这是我将如何改进它。

主脚本将按照某些 namimg 约定创建到可执行文件的符号链接。例如,

ln -s executable1 ./01-task.01

第一个前缀用于排序,后缀标识批次(01-04)。
现在我们生成 4 个 shell 脚本,它们将批次号作为输入并执行类似以下操作

for t in $(ls ./*-task.$batch | sort ; do
   t
   rm t
done

Other answer about 4 shell scripts does not fully satisfies me as it assumes that all tasks take approximatelu the same time and because it requires manual set up. But here is how I would improve it.

Main script will create symbolic links to executables following certain namimg convention. For example,

ln -s executable1 ./01-task.01

first prefix is for sorting and suffix identifies batch (01-04).
Now we spawn 4 shell scripts that would take batch number as input and do something like this

for t in $(ls ./*-task.$batch | sort ; do
   t
   rm t
done
叫嚣ゝ 2024-11-23 19:12:08

这是我的解决方案。这个想法很简单。我创建了一个 fifo 作为信号量,其中每一行代表一个可用资源。当读取队列时,如果没有剩余内容,主进程就会阻塞。并且,在任务完成后,我们只需将任何内容echo到队列中即可返回资源。

function task() {
    local task_no="$1"
    # doing the actual task...
    echo "Executing Task ${task_no}"
    # which takes a long time
    sleep 1
}

function execute_concurrently() {
    local tasks="$1"
    local ps_pool_size="$2"

    # create an anonymous fifo as a Semaphore
    local sema_fifo
    sema_fifo="$(mktemp -u)"
    mkfifo "${sema_fifo}"
    exec 3<>"${sema_fifo}"
    rm -f "${sema_fifo}"

    # every 'x' stands for an available resource
    for i in $(seq 1 "${ps_pool_size}"); do
        echo 'x' >&3
    done

    for task_no in $(seq 1 "${tasks}"); do
        read dummy <&3 # blocks util a resource is available
        (
            trap 'echo x >&3' EXIT # returns the resource on exit
            task "${task_no}"
        )&
    done
    wait # wait util all forked tasks have finished
}

execute_concurrently 10 4

上面的脚本将运行 10 个任务,每次同时运行 4 个任务。您可以将 $(seq 1 "${tasks}") 序列更改为您要运行的实际任务队列。

Here is my solution. The idea is quite simple. I create a fifo as a semaphore, where each line stands for an available resource. When reading the queue, the main process blocks if there is nothing left. And, we return the resource after the task is done by simply echoing anything to the queue.

function task() {
    local task_no="$1"
    # doing the actual task...
    echo "Executing Task ${task_no}"
    # which takes a long time
    sleep 1
}

function execute_concurrently() {
    local tasks="$1"
    local ps_pool_size="$2"

    # create an anonymous fifo as a Semaphore
    local sema_fifo
    sema_fifo="$(mktemp -u)"
    mkfifo "${sema_fifo}"
    exec 3<>"${sema_fifo}"
    rm -f "${sema_fifo}"

    # every 'x' stands for an available resource
    for i in $(seq 1 "${ps_pool_size}"); do
        echo 'x' >&3
    done

    for task_no in $(seq 1 "${tasks}"); do
        read dummy <&3 # blocks util a resource is available
        (
            trap 'echo x >&3' EXIT # returns the resource on exit
            task "${task_no}"
        )&
    done
    wait # wait util all forked tasks have finished
}

execute_concurrently 10 4

The script above will run 10 tasks and 4 each time concurrently. You can change the $(seq 1 "${tasks}") sequence to the actual task queue you want to run.

要走就滚别墨迹 2024-11-23 19:12:08

我根据这个 Writing 中介绍的方法进行了修改Bash 中的进程池

#!/bin/bash

#set -e   # this doesn't work here for some reason
POOL_SIZE=4   # number of workers running in parallel

#######################################################################
#                            populate jobs                            #
#######################################################################

declare -a jobs

for (( i = 1988; i < 2019; i++ )); do
    jobs+=($i)
done

echo '################################################'
echo '    Launching jobs'
echo '################################################'

parallel() {
    local proc procs jobs cur
    jobs=("$@")         # input jobs array
    declare -a procs=() # processes array
    cur=0               # current job idx

    morework=true
    while $morework; do
        # if process array size < pool size, try forking a new proc
        if [[ "${#procs[@]}" -lt "$POOL_SIZE" ]]; then
            if [[ $cur -lt "${#jobs[@]}" ]]; then
                proc=${jobs[$cur]}
                echo "JOB ID = $cur; JOB = $proc."

                ###############
                # do job here #
                ###############

                sleep 3 &

                # add to current running processes
                procs+=("$!")
                # move to the next job
                ((cur++))
            else
                morework=false
                continue
            fi
        fi

        for n in "${!procs[@]}"; do
            kill -0 "${procs[n]}" 2>/dev/null && continue
            # if process is not running anymore, remove from array
            unset procs[n]
        done
    done
    wait
}

parallel "${jobs[@]}"

I made my modifications based on methods introduced in this Writing a process pool in Bash.

#!/bin/bash

#set -e   # this doesn't work here for some reason
POOL_SIZE=4   # number of workers running in parallel

#######################################################################
#                            populate jobs                            #
#######################################################################

declare -a jobs

for (( i = 1988; i < 2019; i++ )); do
    jobs+=($i)
done

echo '################################################'
echo '    Launching jobs'
echo '################################################'

parallel() {
    local proc procs jobs cur
    jobs=("$@")         # input jobs array
    declare -a procs=() # processes array
    cur=0               # current job idx

    morework=true
    while $morework; do
        # if process array size < pool size, try forking a new proc
        if [[ "${#procs[@]}" -lt "$POOL_SIZE" ]]; then
            if [[ $cur -lt "${#jobs[@]}" ]]; then
                proc=${jobs[$cur]}
                echo "JOB ID = $cur; JOB = $proc."

                ###############
                # do job here #
                ###############

                sleep 3 &

                # add to current running processes
                procs+=("$!")
                # move to the next job
                ((cur++))
            else
                morework=false
                continue
            fi
        fi

        for n in "${!procs[@]}"; do
            kill -0 "${procs[n]}" 2>/dev/null && continue
            # if process is not running anymore, remove from array
            unset procs[n]
        done
    done
    wait
}

parallel "${jobs[@]}"
我的黑色迷你裙 2024-11-23 19:12:08

带有 -P 和 -L 选项的 xargs 可以完成这项工作。
您可以从下面的示例中提取这个想法:

#!/usr/bin/env bash

workers_pool_size=10

set -e

function doit {
    cmds=""
    for e in 4 8 16; do
        for m in 1 2 3 4 5 6; do
            cmd="python3 ./doit.py --m $m -e $e -m $m"
            cmds="$cmd\n$cmds"
        done
    done
    echo -e "All commands:\n$cmds"
    echo "Workers pool size = $workers_pool_size"
    echo -e "$cmds" | xargs -t -P $workers_pool_size -L 1 time > /dev/null
}

doit

xargs with -P and -L options does the job.
You can extract the idea from the example below:

#!/usr/bin/env bash

workers_pool_size=10

set -e

function doit {
    cmds=""
    for e in 4 8 16; do
        for m in 1 2 3 4 5 6; do
            cmd="python3 ./doit.py --m $m -e $e -m $m"
            cmds="$cmd\n$cmds"
        done
    done
    echo -e "All commands:\n$cmds"
    echo "Workers pool size = $workers_pool_size"
    echo -e "$cmds" | xargs -t -P $workers_pool_size -L 1 time > /dev/null
}

doit
蹲在坟头点根烟 2024-11-23 19:12:08
#! /bin/bash
doSomething() {
    <...>
}

getCompletedThreads() {
    _runningThreads=("$@")

    removableThreads=()
    for pid in "${_runningThreads[@]}"; do
        if ! ps -p $pid > /dev/null; then
            removableThreads+=($pid)
        fi
    done
    echo "$removableThreads"
}

releasePool() {
    while [[ ${#runningThreads[@]} -eq $MAX_THREAD_NO ]]; do
        echo "releasing"
        removableThreads=( $(getCompletedThreads "${runningThreads[@]}") )
        if [ ${#removableThreads[@]} -eq 0 ]; then
            sleep 0.2
        else
            for removableThread in "${removableThreads[@]}"; do
                runningThreads=( ${runningThreads[@]/$removableThread} ) 
            done
            echo "released"
        fi
    done
}

waitAllThreadComplete() {
    while [[ ${#runningThreads[@]} -ne 0 ]]; do
        removableThreads=( $(getCompletedThreads "${runningThreads[@]}") )
        for removableThread in "${removableThreads[@]}"; do
            runningThreads=( ${runningThreads[@]/$removableThread} ) 
        done

        if [ ${#removableThreads[@]} -eq 0 ]; then
            sleep 0.2
        fi
    done
}


MAX_THREAD_NO=10
runningThreads=()
sequenceNo=0

for i in {1..36}; do
    releasePool

    ((sequenceNo++))
    echo "added $sequenceNo"
    doSomething &

    pid=$!
    runningThreads+=($pid)
done

waitAllThreadComplete
#! /bin/bash
doSomething() {
    <...>
}

getCompletedThreads() {
    _runningThreads=("$@")

    removableThreads=()
    for pid in "${_runningThreads[@]}"; do
        if ! ps -p $pid > /dev/null; then
            removableThreads+=($pid)
        fi
    done
    echo "$removableThreads"
}

releasePool() {
    while [[ ${#runningThreads[@]} -eq $MAX_THREAD_NO ]]; do
        echo "releasing"
        removableThreads=( $(getCompletedThreads "${runningThreads[@]}") )
        if [ ${#removableThreads[@]} -eq 0 ]; then
            sleep 0.2
        else
            for removableThread in "${removableThreads[@]}"; do
                runningThreads=( ${runningThreads[@]/$removableThread} ) 
            done
            echo "released"
        fi
    done
}

waitAllThreadComplete() {
    while [[ ${#runningThreads[@]} -ne 0 ]]; do
        removableThreads=( $(getCompletedThreads "${runningThreads[@]}") )
        for removableThread in "${removableThreads[@]}"; do
            runningThreads=( ${runningThreads[@]/$removableThread} ) 
        done

        if [ ${#removableThreads[@]} -eq 0 ]; then
            sleep 0.2
        fi
    done
}


MAX_THREAD_NO=10
runningThreads=()
sequenceNo=0

for i in {1..36}; do
    releasePool

    ((sequenceNo++))
    echo "added $sequenceNo"
    doSomething &

    pid=$!
    runningThreads+=($pid)
done

waitAllThreadComplete
完美的未来在梦里 2024-11-23 19:12:08

看看我在 bash 中实现的作业池:

#!/bin/bash
#
# Job pool implementation in BASH
# License: Apache 2.0
#

help() {
    echo
    echo "USAGE: $0 {add|wait} <ID> <limit> <command...>"
    echo
    echo "Where:"
    echo "  <ID>       Job pool identifier"
    echo "  <limit>    Job pool size"
    echo "  <command>  Command to run"
    echo
    exit 1
}

pool_cmd=$1
shift
pool_id=$1
shift

if [ "$pool_cmd" = "add" ]; then
    pool_size=$1
    shift
    if [ -z "$pool_id" ] || [ -z "$pool_size" ] || [ $# -eq 0 ]; then
        help
    fi
elif [ "$pool_cmd" = "wait" ]; then
    [ ! -z "$pool_id" ] || help
else
    help
fi

pool_id=$(echo $pool_id | sed 's/\W/_/g')
workdir="/tmp/$(whoami)-jp"
[ -d $workdir ] || mkdir -p $workdir || exit $?
lock_prefix="$workdir/$pool_id"
lock_file="$lock_prefix.$"

lock() {
    # Critical section:
    (
        flock -x 201 || exit 1

        # Wait for other processes to finish
        num_running=0
        for l in $(eval ls "${lock_prefix}.*" 2>/dev/null); do
            if kill -0 $(echo $l | sed 's/.*\.//'); then
                num_running=$(($num_running+1))
            else
                # Remove lock file for non-existent process
                rm -f $l
            fi
        done

        if [ "$pool_cmd" = "wait" ]; then
            [ $num_running -eq 0 ]
            return $?
        elif [ "$pool_cmd" = "add" ]; then
            if [ $num_running -lt $pool_size ]; then
                touch $lock_file
                return 0
            fi
            return 1
        fi

    ) 201>$workdir/.lock
}

unlock() {
    rm -f $lock_file
}

trap "unlock; exit 0" INT TERM EXIT

例如,要在从大量 URL 下载时最多运行 3 个 cURL 进程,您可以将 cURL 命令包装如下:

./jp.sh "My Download Pool" 3 curl http://site1/...
./jp.sh "My Download Pool" 3 curl http://site2/...
./jp.sh "My Download Pool" 3 curl http://site3/...
...

Look at my implementation of job pool in bash:

#!/bin/bash
#
# Job pool implementation in BASH
# License: Apache 2.0
#

help() {
    echo
    echo "USAGE: $0 {add|wait} <ID> <limit> <command...>"
    echo
    echo "Where:"
    echo "  <ID>       Job pool identifier"
    echo "  <limit>    Job pool size"
    echo "  <command>  Command to run"
    echo
    exit 1
}

pool_cmd=$1
shift
pool_id=$1
shift

if [ "$pool_cmd" = "add" ]; then
    pool_size=$1
    shift
    if [ -z "$pool_id" ] || [ -z "$pool_size" ] || [ $# -eq 0 ]; then
        help
    fi
elif [ "$pool_cmd" = "wait" ]; then
    [ ! -z "$pool_id" ] || help
else
    help
fi

pool_id=$(echo $pool_id | sed 's/\W/_/g')
workdir="/tmp/$(whoami)-jp"
[ -d $workdir ] || mkdir -p $workdir || exit $?
lock_prefix="$workdir/$pool_id"
lock_file="$lock_prefix.$"

lock() {
    # Critical section:
    (
        flock -x 201 || exit 1

        # Wait for other processes to finish
        num_running=0
        for l in $(eval ls "${lock_prefix}.*" 2>/dev/null); do
            if kill -0 $(echo $l | sed 's/.*\.//'); then
                num_running=$(($num_running+1))
            else
                # Remove lock file for non-existent process
                rm -f $l
            fi
        done

        if [ "$pool_cmd" = "wait" ]; then
            [ $num_running -eq 0 ]
            return $?
        elif [ "$pool_cmd" = "add" ]; then
            if [ $num_running -lt $pool_size ]; then
                touch $lock_file
                return 0
            fi
            return 1
        fi

    ) 201>$workdir/.lock
}

unlock() {
    rm -f $lock_file
}

trap "unlock; exit 0" INT TERM EXIT

For example, to run at most 3 processes of cURL when downloading from a lot of URLs, you can wrap your cURL commands as follows:

./jp.sh "My Download Pool" 3 curl http://site1/...
./jp.sh "My Download Pool" 3 curl http://site2/...
./jp.sh "My Download Pool" 3 curl http://site3/...
...
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文