Ruby 并发/异步处理(带有简单用例)

发布于 2024-09-29 02:56:51 字数 1059 浏览 4 评论 0原文

我正在研究 ruby​​ 的并行/异步处理功能,并阅读了许多文章和博客文章。我浏览了 EventMachine、Fibers、Reactor、Reia 等。不幸的是,我无法为这个非常简单的用例找到一个简单、有效(且非 IO 阻塞)的解决方案:

File.open('somelogfile.txt') do |file|
  while line = file.gets      # (R) Read from IO
    line = process_line(line) # (P) Process the line
    write_to_db(line)         # (W) Write the output to some IO (DB or file)
  end
end

你可以看到吗,我的小脚本正在执行三个操作读取 (R)、处理 (P) 和读取 (R)、处理 (P) 和 (W)。为简单起见,我们假设每个操作恰好需要 1 个时间单位(例如 10 毫秒),因此当前的代码将执行类似这样的操作(5 行):

Time:       123456789012345 (15 units in total)
Operations: RPWRPWRPWRPWRPW

但是,我希望它执行类似这样的操作:

Time:       1234567 (7 units in total)
Operations: RRRRR
             PPPPP
              WWWWW

显然,我可以运行三个进程(读取器、处理器和写入器)并将读取器的读取行传递到处理器队列,然后将处理后的行传递到写入器队列(所有这些都通过例如 RabbitMQ 进行协调)。但是,用例太简单了,感觉不太对劲。

关于如何做到这一点的任何线索(无需从 Ruby 切换到 Erlang、Closure 或 Scala)?

I was looking into ruby's parallel/asynchronous processing capabilities and read many articles and blog posts. I looked through EventMachine, Fibers, Revactor, Reia, etc, etc. Unfortunately, I wasn't able to find a simple, effective (and non-IO-blocking) solution for this very simple use case:

File.open('somelogfile.txt') do |file|
  while line = file.gets      # (R) Read from IO
    line = process_line(line) # (P) Process the line
    write_to_db(line)         # (W) Write the output to some IO (DB or file)
  end
end

Is you can see, my little script is performing three operations read (R), process (P) & write (W). Let's assume - for simplicity - that each operation takes exactly 1 unit of time (e.g. 10ms), the current code would therefore do something like this (5 lines):

Time:       123456789012345 (15 units in total)
Operations: RPWRPWRPWRPWRPW

But, I would like it to do something like this:

Time:       1234567 (7 units in total)
Operations: RRRRR
             PPPPP
              WWWWW

Obviously, I could run three processes (reader, processor & writer) and pass read lines from reader into the processor queue and then pass processed lines into the writer queue (all coordinated via e.g. RabbitMQ). But, the use-case is so simple, it just doesn't feel right.

Any clues on how this could be done (without switching from Ruby to Erlang, Closure or Scala)?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

尸血腥色 2024-10-06 02:56:51

如果您需要它真正并行(来自单个进程),我相信您必须使用 JRuby 来获得真正的本机线程而不需要 GIL。

您可以使用 DRb 之类的东西来跨多个进程/核心分配处理,但对于您的用例来说,这有点多了。相反,您可以尝试让多个进程使用管道进行通信:

$ cat somelogfile.txt | ruby ./proc-process | ruby ./proc-store

在这种情况下,每个部分都是它自己的进程,可以并行运行,但使用 STDIN / STDOUT 进行通信。这可能是解决您的问题最简单(也是最快)的方法。

# proc-process
while line = $stdin.gets do
  # do cpu intensive stuff here
  $stdout.puts "data to be stored in DB"
  $stdout.flush # this is important
end

# proc-store
while line = $stdin.gets do
  write_to_db(line)
end

If you need it to be truly parallel (from a single process) I believe you'll have to use JRuby to get true native threads and no GIL.

You could use something like DRb to distribute the processing across multiple processes / cores, but for your use case this is a bit much. Instead, you could try having multiple processes communicate using pipes:

$ cat somelogfile.txt | ruby ./proc-process | ruby ./proc-store

In this scenario each piece is its own process that can run in parallel but are communicating using STDIN / STDOUT. This is probably the easiest (and quickest) approach to your problem.

# proc-process
while line = $stdin.gets do
  # do cpu intensive stuff here
  $stdout.puts "data to be stored in DB"
  $stdout.flush # this is important
end

# proc-store
while line = $stdin.gets do
  write_to_db(line)
end
对不⑦ 2024-10-06 02:56:51

查看 peach (http://peach.rubyforge.org/)。做一个并行的“每个”再简单不过了。然而,正如文档所述,您需要在 JRuby 下运行才能使用 JVM 的本机线程。

有关多线程功能的更多详细信息,请参阅 Jorg Mittag 对 这个 SO 问题的回复各种 Ruby 解释器。

Check out peach (http://peach.rubyforge.org/). Doing a parallel "each" couldn't be simpler. However, as the documentation says, you'll need to run under JRuby in order to use the JVM's native threading.

See Jorg Mittag's response to this SO question for a lot of detail on the multithreading capabilities of the various Ruby interpreters.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文