如何将项目动态添加到PowerShell ArrayList并使用Runspace池进行递归处理?

发布于 2025-02-09 21:49:35 字数 3772 浏览 2 评论 0 原文

我有一个循环的,该循环通过 arraylist 迭代,在此过程中,将更多项目添加到列表中,并(迭代地)处理。我正在尝试将此功能转换为使用RunSpacePool同时运行。

这是没有runspace的普通代码:

$array = [System.Collections.ArrayList]@(1, 2, 3, 4, 5)
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
    Write-Host "Counter: $i`tArray: $array"
    if ($array[$i] -in @(1, 2, 3, 4, 5)) {
        $array.Add($array[$i] + 3) | Out-Null
    }
}
Write-Host "Array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"

输出是:

Number of items in array before loop: 5
Counter: 0      Array: 1 2 3 4 5
Counter: 1      Array: 1 2 3 4 5 4
Counter: 2      Array: 1 2 3 4 5 4 5
Counter: 3      Array: 1 2 3 4 5 4 5 6
Counter: 4      Array: 1 2 3 4 5 4 5 6 7
Counter: 5      Array: 1 2 3 4 5 4 5 6 7 8
Counter: 6      Array: 1 2 3 4 5 4 5 6 7 8 7
Counter: 7      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 8      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 9      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 10     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 11     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

这是 runspace函数我正在尝试实现:

$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()
$runspaces = @()

$scriptblock = {
    Param ($i, $array)
    # Start-Sleep 1 # <------ Output varies significantly if this is enabled
    Write-Output "$i value: $array"
    if ($i -in @(1, 2, 3, 4, 5)) {
        $array.Add($i + 3) | Out-Null
    }
}

$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
    $runspace = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$i]).AddArgument($array)
    $runspace.RunspacePool = $pool
    $runspaces += [PSCustomObject]@{ Pipe = $runspace; Status = $runspace.BeginInvoke() }
}

while ($runspaces.Status -ne $null) {
    $completed = $runspaces | Where-Object { $_.Status.IsCompleted -eq $true }
    foreach ($runspace in $completed) {
        $runspace.Pipe.EndInvoke($runspace.Status)
        $runspace.Status = $null
    }
}
Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()

没有睡眠功能的输出是预期的:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Current value: 4        Array: 1 2 3 4 5 4 5 6 7 8
Current value: 5        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 6        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

输出与睡眠:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Array: 1 2 3 4 5 4 5 6 7 8
Number of items in array after loop: 10

我知道这是因为 for 在睡眠时间完成之前退出将5个项目添加到Runspace池中。

是否可以动态地向Arraylist添加更多项目并使用Runspaces同时处理它们?

I have a for loop that iterates through an ArrayList and during the process, adds more items to the list and processes them as well (iteratively). I am trying to convert this function to run concurrently using Runspacepool.

Here is the normal code without runspace:

$array = [System.Collections.ArrayList]@(1, 2, 3, 4, 5)
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
    Write-Host "Counter: $i`tArray: $array"
    if ($array[$i] -in @(1, 2, 3, 4, 5)) {
        $array.Add($array[$i] + 3) | Out-Null
    }
}
Write-Host "Array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"

Output is:

Number of items in array before loop: 5
Counter: 0      Array: 1 2 3 4 5
Counter: 1      Array: 1 2 3 4 5 4
Counter: 2      Array: 1 2 3 4 5 4 5
Counter: 3      Array: 1 2 3 4 5 4 5 6
Counter: 4      Array: 1 2 3 4 5 4 5 6 7
Counter: 5      Array: 1 2 3 4 5 4 5 6 7 8
Counter: 6      Array: 1 2 3 4 5 4 5 6 7 8 7
Counter: 7      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 8      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 9      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 10     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 11     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

Here is the Runspace function that I am trying to implement:

$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()
$runspaces = @()

$scriptblock = {
    Param ($i, $array)
    # Start-Sleep 1 # <------ Output varies significantly if this is enabled
    Write-Output "$i value: $array"
    if ($i -in @(1, 2, 3, 4, 5)) {
        $array.Add($i + 3) | Out-Null
    }
}

$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
    $runspace = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$i]).AddArgument($array)
    $runspace.RunspacePool = $pool
    $runspaces += [PSCustomObject]@{ Pipe = $runspace; Status = $runspace.BeginInvoke() }
}

while ($runspaces.Status -ne $null) {
    $completed = $runspaces | Where-Object { $_.Status.IsCompleted -eq $true }
    foreach ($runspace in $completed) {
        $runspace.Pipe.EndInvoke($runspace.Status)
        $runspace.Status = $null
    }
}
Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()

Output without sleep function is as expected:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Current value: 4        Array: 1 2 3 4 5 4 5 6 7 8
Current value: 5        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 6        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

Output with Sleep:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Array: 1 2 3 4 5 4 5 6 7 8
Number of items in array after loop: 10

I understand that this is happening because the for loop exits before the sleep time is completed and therefore, only the first 5 items are added to the runspace pool.

Is there a way to add more items to the ArrayList dynamically and still process them concurrently using runspaces?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

冷月断魂刀 2025-02-16 21:49:35

您的“工作”行为的核心是,PowerShell是运行您的“非睡眠” scriptBlocks比在的中创建更快循环,因此循环在到达阵列结束之前,就看到了以前的迭代添加的新项目。结果,它必须在项目退出并将其移至循环时,必须处理所有项目的所有。

当您添加 start-sleep 时,它改变了平衡,并且运行 scriptblocks比创建它们,所以循环到达数组的末尾,在最早的迭代添加新项目之前,循环到达了数组的末端。

以下脚本通过将您的和组合到循环中,以在(i)创建新线程和(ii)检查它们是否完成,并且仅退出之间,将其组合为> 循环来解决此问题。 所有完成工作。

很难

$scriptblock = {
    Param ($i, $array)
    # random sleep to simulate variable-length workloads. this is
    # more likely to flush out error conditions than a fixed sleep 
    # period as threads will finish out-of-turn more often
    Start-Sleep (Get-Random -Minimum 1 -Maximum 10)
    Write-Output "$i value: $array"
    if ($i -in @(1, 2, 3, 4, 5)) {
        $array.Add($i + 3) | Out-Null
    }
}

$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()

# note - your "$runspaces" variable is misleading as you're creating 
# "PowerShell" objects, and a "Runspace" is a different thing entirely,
# so I've called it $instances instead
# see https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.powershell?view=powershellsdk-7.0.0
#  vs https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.runspaces.runspace?view=powershellsdk-7.0.0
$instances = @()

$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"

while( $true )
{

    # start PowerShell instances for any items in $array that don't already have one.
    # on the first iteration this will seed the initial instances, and in
    # subsequent iterations it will create new instances for items added to
    # $array since the last iteration.
    while( $instances.Length -lt $array.Count )
    {
        $instance = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$instances.Length]).AddArgument($array);
        $instance.RunspacePool = $pool
        $instances += [PSCustomObject]@{ Value = $instance; Status = $instance.BeginInvoke() }
    }

    # watch out because there's a race condition here. it'll need very unlucky 
    # timing, *but* an instance might have added an item to $array just after
    # the while loop finished, but before the next line runs, so there *could* 
    # be an item in $array that hasn't had an instance created for it even
    # if all the current instances have completed

    # is there any more work to do? (try to mitigate the race condition
    # by checking again for any items in $array that don't have an instance
    # created for them)
    $active = @( $instances | Where-Object { -not $_.Status.IsCompleted } )
    if( ($active.Length -eq 0) -and ($instances.Length -eq $array.Count) )
    {
        # instances have been created for every item in $array,
        # *and* they've run to completion, so there's no more work to do
        break;
    }

    # if there are incomplete instances, wait for a short time to let them run
    # (this is to avoid a "busy wait" - https://en.wikipedia.org/wiki/Busy_waiting)
    Start-Sleep -Milliseconds 250;

}

# all the instances have completed, so end them
foreach ($instance in $instances)
{
    $instance.Value.EndInvoke($instance.Status);
}

Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()

但是,

Number of items in array before loop: 5
1 value: 1 2 3 4 5 6 5 7
2 value: 1 2 3 4 5 6
3 value: 1 2 3 4 5
4 value: 1 2 3 4 5 6 5
5 value: 1 2 3 4 5 6 5 7 4
6 value: 1 2 3 4 5 6 5 7
5 value: 1 2 3 4 5 6 5 7 4 8
7 value: 1 2 3 4 5 6 5 7
4 value: 1 2 3 4 5 6 5 7 4 8 8
8 value: 1 2 3 4 5 6 5 7 4 8 8
8 value: 1 2 3 4 5 6 5 7 4 8 8
7 value: 1 2 3 4 5 6 5 7 4 8 8 7

多线程 根据 $ scriptblock 中随机睡眠的长度的不同。

可能会做出其他改进,但这至少似乎有效...

The core of your "working" behaviour is that PowerShell was running your "non-sleep" scriptblocks faster than it could create them in the for loop, so the loop was seeing the new items being added by previous iterations before it reached the end of the array. As a result it had to process all of the items before it exited and moved on to the while loop.

When you added a Start-Sleep it shifted the balance, and it took longer to run the scriptblocks than it did to create them, so the for loop reached the end of the array before the new items were added by the earliest iterations.

The following script fixes this by combining your for and while loops to repeatedly alternate between (i) creating new threads and (ii) checking if they've finished, and only exiting when all the work is done.

However multi-threading is hard so it's best to assume I've made mistakes somewhere, and test properly before you release it to your live workflow...

$scriptblock = {
    Param ($i, $array)
    # random sleep to simulate variable-length workloads. this is
    # more likely to flush out error conditions than a fixed sleep 
    # period as threads will finish out-of-turn more often
    Start-Sleep (Get-Random -Minimum 1 -Maximum 10)
    Write-Output "$i value: $array"
    if ($i -in @(1, 2, 3, 4, 5)) {
        $array.Add($i + 3) | Out-Null
    }
}

$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()

# note - your "$runspaces" variable is misleading as you're creating 
# "PowerShell" objects, and a "Runspace" is a different thing entirely,
# so I've called it $instances instead
# see https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.powershell?view=powershellsdk-7.0.0
#  vs https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.runspaces.runspace?view=powershellsdk-7.0.0
$instances = @()

$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"

while( $true )
{

    # start PowerShell instances for any items in $array that don't already have one.
    # on the first iteration this will seed the initial instances, and in
    # subsequent iterations it will create new instances for items added to
    # $array since the last iteration.
    while( $instances.Length -lt $array.Count )
    {
        $instance = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$instances.Length]).AddArgument($array);
        $instance.RunspacePool = $pool
        $instances += [PSCustomObject]@{ Value = $instance; Status = $instance.BeginInvoke() }
    }

    # watch out because there's a race condition here. it'll need very unlucky 
    # timing, *but* an instance might have added an item to $array just after
    # the while loop finished, but before the next line runs, so there *could* 
    # be an item in $array that hasn't had an instance created for it even
    # if all the current instances have completed

    # is there any more work to do? (try to mitigate the race condition
    # by checking again for any items in $array that don't have an instance
    # created for them)
    $active = @( $instances | Where-Object { -not $_.Status.IsCompleted } )
    if( ($active.Length -eq 0) -and ($instances.Length -eq $array.Count) )
    {
        # instances have been created for every item in $array,
        # *and* they've run to completion, so there's no more work to do
        break;
    }

    # if there are incomplete instances, wait for a short time to let them run
    # (this is to avoid a "busy wait" - https://en.wikipedia.org/wiki/Busy_waiting)
    Start-Sleep -Milliseconds 250;

}

# all the instances have completed, so end them
foreach ($instance in $instances)
{
    $instance.Value.EndInvoke($instance.Status);
}

Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()

Example output:

Number of items in array before loop: 5
1 value: 1 2 3 4 5 6 5 7
2 value: 1 2 3 4 5 6
3 value: 1 2 3 4 5
4 value: 1 2 3 4 5 6 5
5 value: 1 2 3 4 5 6 5 7 4
6 value: 1 2 3 4 5 6 5 7
5 value: 1 2 3 4 5 6 5 7 4 8
7 value: 1 2 3 4 5 6 5 7
4 value: 1 2 3 4 5 6 5 7 4 8 8
8 value: 1 2 3 4 5 6 5 7 4 8 8
8 value: 1 2 3 4 5 6 5 7 4 8 8
7 value: 1 2 3 4 5 6 5 7 4 8 8 7

Note the order of items in the array will vary depending on the length of the random sleeps in the $scriptblock.

There are probably additional improvements that could be made, but this at least seems to work...

蓝海 2025-02-16 21:49:35

该答案试图为 nofollow noreferrer“>生产者 - 塑料问题< /a> 使用 blockingCollection&lt; t&gt; 提供了生产者/消费者模式的实现

要澄清我以前的答案,正如OP在评论中指出的那样:

如果队列的起始计数(例如2)小于最大线程数(例如5),则只有许多(在这种情况下为2个)线程仍保持活跃,无论添加了多少个项目稍后再到队列。只有线程的起始数处理队列中的其余项目。就我而言,开始计数通常是一个。然后,我做一个 irm indoke-reastmethod )请求,并添加一些10〜20个项目。这些仅由第一个线程处理。 其他线程在开始时进入完整的状态。有解决方案吗?

在此示例中,runspaces将使用 trytake(t,timespan) method> method offload 线程并等待指定的超时。在每个循环迭代中,Runspaces还将更新a 同步hashtable trytake(....)结果。

主线程将使用同步的Hashtable等待,直到所有Runspaces都发送了 $ false 状态,当发生这种情况时,Exit信号将发送到 .completeadding()

即使不是完美的,这也解决了一些问题,其中某些线程可能会从循环提早退出,并且 试图确保所有线程同时结束(当收集中没有更多项目时)< /strong>

生产者逻辑将与上一个答案非常相似,但是,在这种情况下,每个线程将在 $ timeout.second.seconds -5 $ timeout.seconds + 5 在每个循环迭代中。

可以通过 <此> < /a>。

using namespace System.Management.Automation.Runspaces
using namespace System.Collections.Concurrent
using namespace System.Threading

try {
    $threads = 20
    $bc      = [BlockingCollection[int]]::new()
    $status  = [hashtable]::Synchronized(@{ TotalCount = 0 })

    # set a timer, all threads will wait for it before exiting
    # this timespan should be tweaked depending on the task at hand
    $timeout = [timespan]::FromSeconds(5)

    foreach($i in 1, 2, 3, 4, 5) {
        $bc.Add($i)
    }


    $scriptblock = {
        param([timespan] $timeout, [int] $threads)

        $id = [runspace]::DefaultRunspace
        $status[$id.InstanceId] = $true
        $syncRoot = $status.SyncRoot
        $release  = {
            [Threading.Monitor]::Exit($syncRoot)
            [Threading.Monitor]::PulseAll($syncRoot)
        }

        # will use this to simulate random delays
        $min = $timeout.Seconds - 5
        $max = $timeout.Seconds + 5

        [ref] $target = $null
        while(-not $bc.IsCompleted) {
            # NOTE from `Hashtable.Synchronized(Hashtable)` MS Docs:
            #
            #    The Synchronized method is thread safe for multiple readers and writers.
            #    Furthermore, the synchronized wrapper ensures that there is only
            #    one writer writing at a time.
            #
            #    Enumerating through a collection is intrinsically not a
            #    thread-safe procedure. Even when a collection is synchronized,
            #    other threads can still modify the collection, which causes the
            #    enumerator to throw an exception.

            # Mainly doing this (lock on the sync hash) to get the Active Count
            # Not really needed and only for demo porpuses

            # if we can't lock on this object in 200ms go next iteration
            if(-not [Threading.Monitor]::TryEnter($syncRoot, 200)) {
                continue
            }

            # if there are no items in queue, send `$false` to the main thread
            if(-not ($status[$id.InstanceId] = $bc.TryTake($target, $timeout))) {
                # release the lock and signal the threads they can get a handle
                & $release
                # and go next iteration
                continue
            }

            # if there was an item in queue, get the active count
            $active = @($status.Values -eq $true).Count
            # add 1 to the total count
            $status['TotalCount'] += 1
            # and release the lock
            & $release

            Write-Host (
                ('Target Value: {0}' -f $target.Value).PadRight(20) + '|'.PadRight(5) +
                ('Items in Queue: {0}' -f $bc.Count).PadRight(20)   + '|'.PadRight(5) +
                ('Runspace Id: {0}' -f $id.Id).PadRight(20)         + '|'.PadRight(5) +
                ('Active Runspaces [{0:D2} / {1:D2}]' -f $active, $threads)
            )

            $ran = [random]::new()
            # start a simulated delay
            Start-Sleep $ran.Next($min, $max)

            # get a random number between 0 and 10
            $ran = $ran.Next(11)
            # if the number is greater than the Dequeued Item
            if ($ran -gt $target.Value) {
                # enumerate starting from `$ran - 2` up to `$ran`
                foreach($i in ($ran - 2)..$ran) {
                    # enqueue each item
                    $bc.Add($i)
                }
            }

            # Send 1 to the Success Stream, this will help us check
            # if the test succeeded later on
            1
        }
    }

    $iss    = [initialsessionstate]::CreateDefault2()
    $rspool = [runspacefactory]::CreateRunspacePool(1, $threads, $iss, $Host)
    $rspool.ApartmentState = [ApartmentState]::STA
    $rspool.ThreadOptions  = [PSThreadOptions]::UseNewThread
    $rspool.InitialSessionState.Variables.Add([SessionStateVariableEntry[]]@(
        [SessionStateVariableEntry]::new('bc', $bc, 'Producer Consumer Collection')
        [SessionStateVariableEntry]::new('status', $status, 'Monitoring hash for signaling `.CompleteAdding()`')
    ))
    $rspool.Open()

    $params = @{
        Timeout = $timeout
        Threads = $threads
    }

    $rs = for($i = 0; $i -lt $threads; $i++) {
        $ps = [powershell]::Create($iss).AddScript($scriptblock).AddParameters($params)
        $ps.RunspacePool = $rspool

        @{
            Instance    = $ps
            AsyncResult = $ps.BeginInvoke()
        }
    }

    while($status.ContainsValue($true)) {
        Start-Sleep -Milliseconds 200
    }

    # send signal to stop
    $bc.CompleteAdding()

    [int[]] $totalCount = foreach($r in $rs) {
        try {
            $r.Instance.EndInvoke($r.AsyncResult)
            $r.Instance.Dispose()
        }
        catch {
            Write-Error $_
        }
    }
    Write-Host ("`nTotal Count [ IN {0} / OUT {1} ]" -f $totalCount.Count, $status['TotalCount'])
    Write-Host ("Items in Queue: {0}" -f $bc.Count)
    Write-Host ("Test Succeeded: {0}" -f (
        [Linq.Enumerable]::Sum($totalCount) -eq $status['TotalCount'] -and
        $bc.Count -eq 0
    ))
}
finally {
    ($bc, $rspool).ForEach('Dispose')
}

This answer attempts to provide a better solution to the producer-consumer problem using a BlockingCollection<T> which provides an implementation of the producer/consumer pattern.

To clarify on the issue with my previous answer, as OP has noted in a comment:

If the starting count of the queue (say, 2) is less than the max number of threads (say 5), then only that many (2, in this case) threads remain active no matter how many ever items are added to the queue later. Only the starting number of threads process the rest of the items in the queue. In my case, the starting count is usually one. Then I make a irm (alias for Invoke-RestMethod) request, and add some 10~20 items. These are processed by only the first thread. The other threads go to Completed state right at the start. Is there a solution to this?

For this example, the runspaces will be using the TryTake(T, TimeSpan) method overload which blocks the thread and waits for the specified timeout. On each loop iteration the runspaces will also be updating a Synchronized Hashtable with their TryTake(..) result.

The main thread will be using the Synchronized Hashtable to wait until all runspaces had sent a $false status, when this happens an exit signal is sent to the threads to with .CompleteAdding().

Even though not perfect, this solves the problem where some of the threads might exit early from the loop and attempts to ensure that all threads end at the same time (when there are no more items in the collection).

The producer logic will be very similar to the previous answer, however, in this case each thread will wait random amount of time between $timeout.Seconds - 5 and $timeout.Seconds + 5 on each loop iteration.

The results one can expect from this demo can be found on this gist.

using namespace System.Management.Automation.Runspaces
using namespace System.Collections.Concurrent
using namespace System.Threading

try {
    $threads = 20
    $bc      = [BlockingCollection[int]]::new()
    $status  = [hashtable]::Synchronized(@{ TotalCount = 0 })

    # set a timer, all threads will wait for it before exiting
    # this timespan should be tweaked depending on the task at hand
    $timeout = [timespan]::FromSeconds(5)

    foreach($i in 1, 2, 3, 4, 5) {
        $bc.Add($i)
    }


    $scriptblock = {
        param([timespan] $timeout, [int] $threads)

        $id = [runspace]::DefaultRunspace
        $status[$id.InstanceId] = $true
        $syncRoot = $status.SyncRoot
        $release  = {
            [Threading.Monitor]::Exit($syncRoot)
            [Threading.Monitor]::PulseAll($syncRoot)
        }

        # will use this to simulate random delays
        $min = $timeout.Seconds - 5
        $max = $timeout.Seconds + 5

        [ref] $target = $null
        while(-not $bc.IsCompleted) {
            # NOTE from `Hashtable.Synchronized(Hashtable)` MS Docs:
            #
            #    The Synchronized method is thread safe for multiple readers and writers.
            #    Furthermore, the synchronized wrapper ensures that there is only
            #    one writer writing at a time.
            #
            #    Enumerating through a collection is intrinsically not a
            #    thread-safe procedure. Even when a collection is synchronized,
            #    other threads can still modify the collection, which causes the
            #    enumerator to throw an exception.

            # Mainly doing this (lock on the sync hash) to get the Active Count
            # Not really needed and only for demo porpuses

            # if we can't lock on this object in 200ms go next iteration
            if(-not [Threading.Monitor]::TryEnter($syncRoot, 200)) {
                continue
            }

            # if there are no items in queue, send `$false` to the main thread
            if(-not ($status[$id.InstanceId] = $bc.TryTake($target, $timeout))) {
                # release the lock and signal the threads they can get a handle
                & $release
                # and go next iteration
                continue
            }

            # if there was an item in queue, get the active count
            $active = @($status.Values -eq $true).Count
            # add 1 to the total count
            $status['TotalCount'] += 1
            # and release the lock
            & $release

            Write-Host (
                ('Target Value: {0}' -f $target.Value).PadRight(20) + '|'.PadRight(5) +
                ('Items in Queue: {0}' -f $bc.Count).PadRight(20)   + '|'.PadRight(5) +
                ('Runspace Id: {0}' -f $id.Id).PadRight(20)         + '|'.PadRight(5) +
                ('Active Runspaces [{0:D2} / {1:D2}]' -f $active, $threads)
            )

            $ran = [random]::new()
            # start a simulated delay
            Start-Sleep $ran.Next($min, $max)

            # get a random number between 0 and 10
            $ran = $ran.Next(11)
            # if the number is greater than the Dequeued Item
            if ($ran -gt $target.Value) {
                # enumerate starting from `$ran - 2` up to `$ran`
                foreach($i in ($ran - 2)..$ran) {
                    # enqueue each item
                    $bc.Add($i)
                }
            }

            # Send 1 to the Success Stream, this will help us check
            # if the test succeeded later on
            1
        }
    }

    $iss    = [initialsessionstate]::CreateDefault2()
    $rspool = [runspacefactory]::CreateRunspacePool(1, $threads, $iss, $Host)
    $rspool.ApartmentState = [ApartmentState]::STA
    $rspool.ThreadOptions  = [PSThreadOptions]::UseNewThread
    $rspool.InitialSessionState.Variables.Add([SessionStateVariableEntry[]]@(
        [SessionStateVariableEntry]::new('bc', $bc, 'Producer Consumer Collection')
        [SessionStateVariableEntry]::new('status', $status, 'Monitoring hash for signaling `.CompleteAdding()`')
    ))
    $rspool.Open()

    $params = @{
        Timeout = $timeout
        Threads = $threads
    }

    $rs = for($i = 0; $i -lt $threads; $i++) {
        $ps = [powershell]::Create($iss).AddScript($scriptblock).AddParameters($params)
        $ps.RunspacePool = $rspool

        @{
            Instance    = $ps
            AsyncResult = $ps.BeginInvoke()
        }
    }

    while($status.ContainsValue($true)) {
        Start-Sleep -Milliseconds 200
    }

    # send signal to stop
    $bc.CompleteAdding()

    [int[]] $totalCount = foreach($r in $rs) {
        try {
            $r.Instance.EndInvoke($r.AsyncResult)
            $r.Instance.Dispose()
        }
        catch {
            Write-Error $_
        }
    }
    Write-Host ("`nTotal Count [ IN {0} / OUT {1} ]" -f $totalCount.Count, $status['TotalCount'])
    Write-Host ("Items in Queue: {0}" -f $bc.Count)
    Write-Host ("Test Succeeded: {0}" -f (
        [Linq.Enumerable]::Sum($totalCount) -eq $status['TotalCount'] -and
        $bc.Count -eq 0
    ))
}
finally {
    ($bc, $rspool).ForEach('Dispose')
}
堇色安年 2025-02-16 21:49:35

请注意,此答案不能为OP问题提供一个很好的解决方案。请参阅此答案最好选择“ nofollow noreferrer”>“ nofollow noreferrer”> producter-consumer-consumer-cosmumer-cosmumer-cosumer-cosumer-comploer


这与麦克莱顿的有用答案希望,希望这两个答案都可以使您解决问题。此示例使用a conturrentqueue&lt; t&gt; ,并在多个线程中执行相同的操作。

如您所见,在这种情况下,我们仅启动 5个线程,它将试图同时脱离这些项目。

如果在0到10之间的随机生成的数字大于脱水项目,它将创建一个数组从随机数开始-2到给定的随机数 并招募它们(试图模拟, ,您在评论中发布的内容“实际问题涉及 Invoke-restMethod irm )多个终点,基于其结果,我可能必须查询更多相似的端点“ )。

请注意,在此示例中,我正在使用 $ threads = $ queue.count ,但是情况并非总是如此。不要启动太多线程,否则您可能会杀死您的会话!还要注意,如果同时查询多个端点,您可能会超载网络。我要说,将线程始终低于 $ queue.count

您可以从下面的代码中期望的结果在每个运行时都应有很大差异。

using namespace System.Management.Automation.Runspaces
using namespace System.Collections.Concurrent

try {
    $queue = [ConcurrentQueue[int]]::new()
    foreach($i in 1, 2, 3, 4, 5) {
        $queue.Enqueue($i)
    }
    $threads = $queue.Count

    $scriptblock = {
        [ref] $target = $null
        while($queue.TryDequeue($target)) {
            [pscustomobject]@{
                'Target Value'      = $target.Value
                'Elements in Queue' = $queue.Count
            }

            # get a random number between 0 and 10
            $ran = Get-Random -Maximum 11
            # if the number is greater than the Dequeued Item
            if ($ran -gt $target.Value) {
                # enumerate starting from `$ran - 2` up to `$ran`
                foreach($i in ($ran - 2)..$ran) {
                    # enqueue each item
                    $queue.Enqueue($i)
                }
            }
        }
    }

    $iss    = [initialsessionstate]::CreateDefault2()
    $rspool = [runspacefactory]::CreateRunspacePool(1, $threads, $iss, $Host)
    $rspool.InitialSessionState.Variables.Add([SessionStateVariableEntry]::new(
        'queue', $queue, ''
    ))
    $rspool.Open()

    $rs = for($i = 0; $i -lt $threads; $i++) {
        $ps = [powershell]::Create().AddScript($scriptblock)
        $ps.RunspacePool = $rspool

        @{
            Instance    = $ps
            AsyncResult = $ps.BeginInvoke()
        }
    }

    foreach($r in $rs) {
        try {
            $r.Instance.EndInvoke($r.AsyncResult)
            $r.Instance.Dispose()
        }
        catch {
            Write-Error $_
        }
    }
}
finally {
    $rspool.ForEach('Dispose')
}

Note, this answer DOES NOT provide a good solution to OP's problem. See this answer for a better take on the producer-consumer problem.


This is a different approach from mclayton's helpful answer, hopefully both answers can lead you to solve your problem. This example uses a ConcurrentQueue<T> and consists in multiple threads performing the same action.

As you may see, in this case we start only 5 threads that will be trying to dequeue the items concurrently.

If the randomly generated number between 0 and 10 is greater than the dequeued item, it creates an array starting from the random number - 2 up to the given random number and enqueues them (tries to simulate, badly, what you have posted in comments, "The actual problem involves Invoke-RestMethod (irm) towards multiple endpoints, based on the results of which, I may have to query more similar endpoints").

Do note, for this example I'm using $threads = $queue.Count, however this should not be always the case. Don't start too many threads or you might kill your session! Also be aware you might overload your network if querying multiple endpoints at the same time. I would say, keep the threads always below $queue.Count.

The results you can expect from below code should vary greatly on each runtime.

using namespace System.Management.Automation.Runspaces
using namespace System.Collections.Concurrent

try {
    $queue = [ConcurrentQueue[int]]::new()
    foreach($i in 1, 2, 3, 4, 5) {
        $queue.Enqueue($i)
    }
    $threads = $queue.Count

    $scriptblock = {
        [ref] $target = $null
        while($queue.TryDequeue($target)) {
            [pscustomobject]@{
                'Target Value'      = $target.Value
                'Elements in Queue' = $queue.Count
            }

            # get a random number between 0 and 10
            $ran = Get-Random -Maximum 11
            # if the number is greater than the Dequeued Item
            if ($ran -gt $target.Value) {
                # enumerate starting from `$ran - 2` up to `$ran`
                foreach($i in ($ran - 2)..$ran) {
                    # enqueue each item
                    $queue.Enqueue($i)
                }
            }
        }
    }

    $iss    = [initialsessionstate]::CreateDefault2()
    $rspool = [runspacefactory]::CreateRunspacePool(1, $threads, $iss, $Host)
    $rspool.InitialSessionState.Variables.Add([SessionStateVariableEntry]::new(
        'queue', $queue, ''
    ))
    $rspool.Open()

    $rs = for($i = 0; $i -lt $threads; $i++) {
        $ps = [powershell]::Create().AddScript($scriptblock)
        $ps.RunspacePool = $rspool

        @{
            Instance    = $ps
            AsyncResult = $ps.BeginInvoke()
        }
    }

    foreach($r in $rs) {
        try {
            $r.Instance.EndInvoke($r.AsyncResult)
            $r.Instance.Dispose()
        }
        catch {
            Write-Error $_
        }
    }
}
finally {
    $rspool.ForEach('Dispose')
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文