如何将项目动态添加到PowerShell ArrayList并使用Runspace池进行递归处理?
我有一个循环的,该循环通过
arraylist
迭代,在此过程中,将更多项目添加到列表中,并(迭代地)处理。我正在尝试将此功能转换为使用RunSpacePool同时运行。
这是没有runspace的普通代码:
$array = [System.Collections.ArrayList]@(1, 2, 3, 4, 5)
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
Write-Host "Counter: $i`tArray: $array"
if ($array[$i] -in @(1, 2, 3, 4, 5)) {
$array.Add($array[$i] + 3) | Out-Null
}
}
Write-Host "Array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
输出是:
Number of items in array before loop: 5
Counter: 0 Array: 1 2 3 4 5
Counter: 1 Array: 1 2 3 4 5 4
Counter: 2 Array: 1 2 3 4 5 4 5
Counter: 3 Array: 1 2 3 4 5 4 5 6
Counter: 4 Array: 1 2 3 4 5 4 5 6 7
Counter: 5 Array: 1 2 3 4 5 4 5 6 7 8
Counter: 6 Array: 1 2 3 4 5 4 5 6 7 8 7
Counter: 7 Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 8 Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 9 Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 10 Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 11 Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12
这是 runspace函数我正在尝试实现:
$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()
$runspaces = @()
$scriptblock = {
Param ($i, $array)
# Start-Sleep 1 # <------ Output varies significantly if this is enabled
Write-Output "$i value: $array"
if ($i -in @(1, 2, 3, 4, 5)) {
$array.Add($i + 3) | Out-Null
}
}
$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
$runspace = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$i]).AddArgument($array)
$runspace.RunspacePool = $pool
$runspaces += [PSCustomObject]@{ Pipe = $runspace; Status = $runspace.BeginInvoke() }
}
while ($runspaces.Status -ne $null) {
$completed = $runspaces | Where-Object { $_.Status.IsCompleted -eq $true }
foreach ($runspace in $completed) {
$runspace.Pipe.EndInvoke($runspace.Status)
$runspace.Status = $null
}
}
Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()
没有睡眠功能的输出是预期的:
Number of items in array before loop: 5
Current value: 1 Array: 1 2 3 4 5
Current value: 2 Array: 1 2 3 4 5 4
Current value: 3 Array: 1 2 3 4 5 4 5
Current value: 4 Array: 1 2 3 4 5 4 5 6
Current value: 5 Array: 1 2 3 4 5 4 5 6 7
Current value: 4 Array: 1 2 3 4 5 4 5 6 7 8
Current value: 5 Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 6 Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7 Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 8 Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7 Array: 1 2 3 4 5 4 5 6 7 8 7 8
Current value: 8 Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12
输出与睡眠:
Number of items in array before loop: 5
Current value: 1 Array: 1 2 3 4 5
Current value: 2 Array: 1 2 3 4 5 4
Current value: 3 Array: 1 2 3 4 5 4 5
Current value: 4 Array: 1 2 3 4 5 4 5 6
Current value: 5 Array: 1 2 3 4 5 4 5 6 7
Array: 1 2 3 4 5 4 5 6 7 8
Number of items in array after loop: 10
我知道这是因为 for 在睡眠时间完成之前退出将5个项目添加到Runspace池中。
是否可以动态地向Arraylist添加更多项目并使用Runspaces同时处理它们?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
您的“工作”行为的核心是,PowerShell是运行您的“非睡眠” scriptBlocks比在的
中创建
更快循环,因此循环在到达阵列结束之前,就看到了以前的迭代添加的新项目。结果,它必须在项目退出并将其移至
循环时,必须处理所有项目的所有。当您添加
start-sleep
时,它改变了平衡,并且运行 scriptblocks比创建它们,所以循环到达数组的末尾,在最早的迭代添加新项目之前,循环到达了数组的末端。以下脚本通过将您的和
组合到
循环中,以在(i)创建新线程和(ii)检查它们是否完成,并且仅退出之间,将其组合为> 循环来解决此问题。 所有完成工作。很难
但是,
多线程 根据
$ scriptblock
中随机睡眠的长度的不同。可能会做出其他改进,但这至少似乎有效...
The core of your "working" behaviour is that PowerShell was running your "non-sleep" scriptblocks faster than it could create them in the
for
loop, so the loop was seeing the new items being added by previous iterations before it reached the end of the array. As a result it had to process all of the items before it exited and moved on to thewhile
loop.When you added a
Start-Sleep
it shifted the balance, and it took longer to run the scriptblocks than it did to create them, so thefor
loop reached the end of the array before the new items were added by the earliest iterations.The following script fixes this by combining your
for
andwhile
loops to repeatedly alternate between (i) creating new threads and (ii) checking if they've finished, and only exiting when all the work is done.However multi-threading is hard so it's best to assume I've made mistakes somewhere, and test properly before you release it to your live workflow...
Example output:
Note the order of items in the array will vary depending on the length of the random sleeps in the
$scriptblock
.There are probably additional improvements that could be made, but this at least seems to work...
该答案试图为 nofollow noreferrer“>生产者 - 塑料问题< /a> 使用
blockingCollection&lt; t&gt;
提供了生产者/消费者模式的实现。要澄清我以前的答案,正如OP在评论中指出的那样:
在此示例中,runspaces将使用
trytake(t,timespan) method> method offload
线程并等待指定的超时。在每个循环迭代中,Runspaces还将更新a 同步hashtable
trytake(....)
结果。主线程将使用同步的Hashtable等待,直到所有Runspaces都发送了
$ false
状态,当发生这种情况时,Exit信号将发送到.completeadding()
。即使不是完美的,这也解决了一些问题,其中某些线程可能会从循环提早退出,并且 试图确保所有线程同时结束(当收集中没有更多项目时)< /strong> 。
生产者逻辑将与上一个答案非常相似,但是,在这种情况下,每个线程将在
$ timeout.second.seconds -5
和$ timeout.seconds + 5
在每个循环迭代中。可以通过 <此> < /a>。
This answer attempts to provide a better solution to the producer-consumer problem using a
BlockingCollection<T>
which provides an implementation of the producer/consumer pattern.To clarify on the issue with my previous answer, as OP has noted in a comment:
For this example, the runspaces will be using the
TryTake(T, TimeSpan)
method overload which blocks the thread and waits for the specified timeout. On each loop iteration the runspaces will also be updating a Synchronized Hashtable with theirTryTake(..)
result.The main thread will be using the Synchronized Hashtable to wait until all runspaces had sent a
$false
status, when this happens an exit signal is sent to the threads to with.CompleteAdding()
.Even though not perfect, this solves the problem where some of the threads might exit early from the loop and attempts to ensure that all threads end at the same time (when there are no more items in the collection).
The producer logic will be very similar to the previous answer, however, in this case each thread will wait random amount of time between
$timeout.Seconds - 5
and$timeout.Seconds + 5
on each loop iteration.The results one can expect from this demo can be found on this gist.
请注意,此答案不能为OP问题提供一个很好的解决方案。请参阅此答案最好选择“ nofollow noreferrer”>“ nofollow noreferrer”> producter-consumer-consumer-cosmumer-cosmumer-cosumer-cosumer-comploer 。
这与麦克莱顿的有用答案希望,希望这两个答案都可以使您解决问题。此示例使用a conturrentqueue&lt; t&gt; ,并在多个线程中执行相同的操作。
如您所见,在这种情况下,我们仅启动 5个线程,它将试图同时脱离这些项目。
如果在0到10之间的随机生成的数字大于脱水项目,它将创建一个数组从随机数开始-2到给定的随机数 并招募它们(试图模拟, ,您在评论中发布的内容“实际问题涉及
Invoke-restMethod
(irm
)多个终点,基于其结果,我可能必须查询更多相似的端点“ )。请注意,在此示例中,我正在使用
$ threads = $ queue.count
,,但是情况并非总是如此。不要启动太多线程,否则您可能会杀死您的会话!还要注意,如果同时查询多个端点,您可能会超载网络。我要说,将线程始终低于$ queue.count
。您可以从下面的代码中期望的结果在每个运行时都应有很大差异。
Note, this answer DOES NOT provide a good solution to OP's problem. See this answer for a better take on the producer-consumer problem.
This is a different approach from mclayton's helpful answer, hopefully both answers can lead you to solve your problem. This example uses a
ConcurrentQueue<T>
and consists in multiple threads performing the same action.As you may see, in this case we start only 5 threads that will be trying to dequeue the items concurrently.
If the randomly generated number between 0 and 10 is greater than the dequeued item, it creates an array starting from the random number - 2 up to the given random number and enqueues them (tries to simulate, badly, what you have posted in comments, "The actual problem involves
Invoke-RestMethod
(irm
) towards multiple endpoints, based on the results of which, I may have to query more similar endpoints").Do note, for this example I'm using
$threads = $queue.Count
, however this should not be always the case. Don't start too many threads or you might kill your session! Also be aware you might overload your network if querying multiple endpoints at the same time. I would say, keep the threads always below$queue.Count
.The results you can expect from below code should vary greatly on each runtime.