ruby中如何锁定fork共享的IO
我们如何锁定多个 ruby 进程共享的 IO?
考虑这个脚本:
#!/usr/bin/ruby -w
# vim: ts=2 sw=2 et
if ARGV.length != 2
$stderr.puts "Usage: test-io-fork.rb num_child num_iteration"
exit 1
end
CHILD = ARGV[0].to_i
ITERATION = ARGV[1].to_i
def now
t = Time.now
"#{t.strftime('%H:%M:%S')}.#{t.usec}"
end
MAP = %w(nol satu dua tiga empat lima enam tujuh delapan sembilan)
IO.popen('-', 'w') {|pipe|
unless pipe
# Logger child
File.open('test-io-fork.log', 'w') {|log|
log.puts "#{now} Program start"
$stdin.each {|line|
log.puts "#{now} #{line}"
}
log.puts "#{now} Program end"
}
exit!
end
pipe.sync = true
pipe.puts "Before fork"
CHILD.times {|c|
fork {
pid = Process.pid
srand
ITERATION.times {|i|
n = rand(9)
sleep(n / 100000.0)
pipe.puts "##{c}:#{i} #{MAP[n]} => #{n}, #{n} => #{MAP[n]} ##{c}:#{i}"
}
}
}
}
并像这样尝试:
./test-io-fork.rb 200 50
正如预期的那样,test-io-fork.log 文件将包含 IO 竞争条件的标志。
我想要实现的是为自定义 GPS 协议创建一个 TCP 服务器,将 GPS 点保存到数据库中。因为该服务器将处理 1000 个并发客户端,所以我想将数据库连接限制为仅一个子级,而不是同时打开 1000 个数据库连接。该服务器将在 Linux 上运行。
How can we lock an IO that has been shared by multiple ruby process?
Consider this script:
#!/usr/bin/ruby -w
# vim: ts=2 sw=2 et
if ARGV.length != 2
$stderr.puts "Usage: test-io-fork.rb num_child num_iteration"
exit 1
end
CHILD = ARGV[0].to_i
ITERATION = ARGV[1].to_i
def now
t = Time.now
"#{t.strftime('%H:%M:%S')}.#{t.usec}"
end
MAP = %w(nol satu dua tiga empat lima enam tujuh delapan sembilan)
IO.popen('-', 'w') {|pipe|
unless pipe
# Logger child
File.open('test-io-fork.log', 'w') {|log|
log.puts "#{now} Program start"
$stdin.each {|line|
log.puts "#{now} #{line}"
}
log.puts "#{now} Program end"
}
exit!
end
pipe.sync = true
pipe.puts "Before fork"
CHILD.times {|c|
fork {
pid = Process.pid
srand
ITERATION.times {|i|
n = rand(9)
sleep(n / 100000.0)
pipe.puts "##{c}:#{i} #{MAP[n]} => #{n}, #{n} => #{MAP[n]} ##{c}:#{i}"
}
}
}
}
And try it like this:
./test-io-fork.rb 200 50
Like expected, the test-io-fork.log files would contains sign of IO race condition.
What I want to achieve is to make a TCP server for custom GPS protocol that will save the GPS points to database. Because this server would handle 1000 concurrent clients, I would like to restrict database connection to only one child instead opening 1000 database connection simultaneously. This server would run on linux.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
更新
在接受答案后进行更新可能是不好的形式,但原文有点误导。 ruby 是否对自动附加的换行符进行单独的 write(2) 调用取决于输出 IO 对象的缓冲状态。
$stdout
(当连接到 tty 时)通常是行缓冲的,因此puts()
的效果——给定合理大小的字符串——隐式添加换行符是对write(2)
的一次调用。然而,正如 OP 发现的那样,IO.pipe
和$stderr
并非如此。原始答案
将您的主要
pipe.puts()
参数更改为换行符字符串:为什么?您设置
pipe.sync
希望管道写入是原子的且非交错的,因为它们(大概)小于PIPE_BUF
字节。但它不起作用,因为 ruby 的管道puts()
实现单独调用 write(2) 来附加尾随换行符,这就是为什么你的写入有时会失败在您期望换行的地方交错。这是从脚本的 fork-following strace 中摘录的证实摘录:
但是放入自己的换行符可以解决问题,确保您的整个记录在一个系统调用中传输:
如果由于某种原因无法为您工作,您将拥有协调进程间互斥体(如
File.flock()
)。UPDATE
It may be bad form to update after the answer was accepted, but the original is a bit misleading. Whether or not ruby makes a separate
write(2)
call for the automatically-appended newline is dependent upon the buffering state of the output IO object.$stdout
(when connected to a tty) is generally line-buffered, so the effect of aputs()
-- given reasonably sized string -- with implicitly added newline is a single call towrite(2)
. Not so, however, withIO.pipe
and$stderr
, as the OP discovered.ORIGINAL ANSWER
Change your chief
pipe.puts()
argument to be a newline terminated string:Why? You set
pipe.sync
hoping that the pipe writes would be atomic and non-interleaved, since they are (presumably) less thanPIPE_BUF
bytes. But it didn't work, because ruby's pipeputs()
implementation makes a separate call to write(2) to append the trailing newline, and that's why your writes are sometimes interleaved where you expected a newline.Here's a corroborating excerpt from a fork-following strace of your script:
But putting in your own newline solves the problem, making sure that your entire record is transmitted in one syscall:
If for some reason that cannot work for you, you'll have to coordinate an interprocess mutex (like
File.flock()
).