我如何管理 ruby​​ 线程以便它们完成所有工作?

发布于 2024-11-13 07:15:02 字数 541 浏览 0 评论 0原文

我有一个可以分为独立单元的计算,我现在处理它的方式是创建固定数量的线程,然后移交每个线程中要完成的工作块。因此,在伪代码中,它看起来像这样:

# main thread
work_units.take(10).each {|work_unit| spawn_thread_for work_unit}

def spawn_thread_for(work)
  Thread.new do
    do_some work
    more_work = work_units.pop
    spawn_thread_for more_work unless more_work.nil?
  end
end

基本上,一旦创建了初始数量的线程,每个线程都会执行一些工作,然后继续从工作堆栈中获取要完成的工作,直到没有剩余。当我在 irb 中运行时一切正常,但是当我使用解释器执行脚本时,一切都运行得不太好。我不知道如何让主线程等待所有工作完成。有没有一种好的方法可以做到这一点,或者我是否坚持在主线程中执行 sleep 10 Until work_units.empty?

I have a computation that can be divided into independent units and the way I'm dealing with it now is by creating a fixed number of threads and then handing off chunks of work to be done in each thread. So in pseudo code here's what it looks like

# main thread
work_units.take(10).each {|work_unit| spawn_thread_for work_unit}

def spawn_thread_for(work)
  Thread.new do
    do_some work
    more_work = work_units.pop
    spawn_thread_for more_work unless more_work.nil?
  end
end

Basically once the initial number of threads is created each one does some work and then keeps taking stuff to be done from the work stack until nothing is left. Everything works fine when I run things in irb but when I execute the script using the interpreter things don't work out so well. I'm not sure how to make the main thread wait until all the work is finished. Is there a nice way of doing this or am I stuck with executing sleep 10 until work_units.empty? in the main thread

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

流年里的时光 2024-11-20 07:15:02

在 ruby​​ 1.9(和 2.0)中,您可以使用 stdlib 中的 ThreadsWait 来实现此目的:

require 'thread'
require 'thwait'

threads = []
threads << Thread.new { }
threads << Thread.new { }
ThreadsWait.all_waits(*threads)

In ruby 1.9 (and 2.0), you can use ThreadsWait from the stdlib for this purpose:

require 'thread'
require 'thwait'

threads = []
threads << Thread.new { }
threads << Thread.new { }
ThreadsWait.all_waits(*threads)
勿挽旧人 2024-11-20 07:15:02

如果修改 spawn_thread_for 以保存对创建的 Thread 的引用,则可以在线程上调用 Thread#join 来等待完成

x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" }
a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" }
x.join # Let the threads finish before
a.join # main thread exits...

: :(

abxyzc

ri Thread.new文档中窃取。有关更多详细信息,请参阅ri Thread.join文档。)

因此,如果您修改spawn_thread_for 保存线程引用,你可以加入它们:(

未经测试,但应该有味道)

# main thread
work_units = Queue.new # and fill the queue...

threads = []
10.downto(1) do
  threads << Thread.new do
    loop do
      w = work_units.pop
      Thread::exit() if w.nil?
      do_some_work(w)
    end
  end
end

# main thread continues while work threads devour work

threads.each(&:join)

If you modify spawn_thread_for to save a reference to your created Thread, then you can call Thread#join on the thread to wait for completion:

x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" }
a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" }
x.join # Let the threads finish before
a.join # main thread exits...

produces:

abxyzc

(Stolen from the ri Thread.new documentation. See the ri Thread.join documentation for some more details.)

So, if you amend spawn_thread_for to save the Thread references, you can join on them all:

(Untested, but ought to give the flavor)

# main thread
work_units = Queue.new # and fill the queue...

threads = []
10.downto(1) do
  threads << Thread.new do
    loop do
      w = work_units.pop
      Thread::exit() if w.nil?
      do_some_work(w)
    end
  end
end

# main thread continues while work threads devour work

threads.each(&:join)
客…行舟 2024-11-20 07:15:02
Thread.list.each{ |t| t.join unless t == Thread.current }
Thread.list.each{ |t| t.join unless t == Thread.current }
要走就滚别墨迹 2024-11-20 07:15:02

看起来您正在复制 Parallel Each (Peach) 库提供的内容。

It seems like you are replicating what the Parallel Each (Peach) library provides.

软甜啾 2024-11-20 07:15:02

您可以使用 Thread#join

join(p1 = v1) public

调用线程将暂停执行并继续运行。在 thr 退出或超过限制秒数之前不会返回。如果超时则返回nil,否则返回thr。

您还可以使用 Enumerable#each_slice 批量迭代工作单元

work_units.each_slice(10) do |batch|
  # handle each work unit in a thread
  threads = batch.map do |work_unit|
    spawn_thread_for work_unit
  end

  # wait until current batch work units finish before handling the next batch
  threads.each(&:join)
end

You can use Thread#join

join(p1 = v1) public

The calling thread will suspend execution and run thr. Does not return until thr exits or until limit seconds have passed. If the time limit expires, nil will be returned, otherwise thr is returned.

Also you can use Enumerable#each_slice to iterate over the work units in batches

work_units.each_slice(10) do |batch|
  # handle each work unit in a thread
  threads = batch.map do |work_unit|
    spawn_thread_for work_unit
  end

  # wait until current batch work units finish before handling the next batch
  threads.each(&:join)
end
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文