Ruby:内存中基于哈希的线程安全缓冲区?

发布于 2024-11-02 02:08:53 字数 616 浏览 0 评论 0原文

我正在 Redis 支持的库中实现一种写入/存储缓冲区,以将多个 hincrby 调用压缩为单个调用。缓冲区需要完全原子化并且跨多个线程工作。

我对处理线程安全还很陌生,因此;是否有任何现有的库或标准化方法来实现在线程环境中正常工作的基于全局哈希的缓冲区/队列?

例如,缓冲区哈希的工作方式类似于以下伪代码:

buffer #=> { :ident1 => { :value_a => 1, :value_b => 4 },
       #     :ident2 => { :value_a => 2, :value_b => 3 } }

buffer[:ident1][:value_a] #=> 1

# saving merges and increments {:value_a => 2} into buffer[:ident1]
save(:ident1, {:value_a => 2})

buffer[:ident1][:value_a] #=> 3

其想法是,在 X 次 save 调用之后,通过对来自的每个项目调用 save 来刷新缓冲区缓冲区。

I'm implementing a kind of write/store buffer in a Redis-backed library to squash multiple hincrby calls into a single call. The buffer needs to be fully atomic and work across multiple threads.

I'm quite new to dealing with thread-safety, hence; Are there any existing libraries or standardized ways to implement a global Hash-based buffer/queue that works fine in threaded environments?

As an example, the buffer hash would work something like this pseudo code:

buffer #=> { :ident1 => { :value_a => 1, :value_b => 4 },
       #     :ident2 => { :value_a => 2, :value_b => 3 } }

buffer[:ident1][:value_a] #=> 1

# saving merges and increments {:value_a => 2} into buffer[:ident1]
save(:ident1, {:value_a => 2})

buffer[:ident1][:value_a] #=> 3

The idea is that after X number of save calls the buffer is flushed by calling save with each item from the buffer.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

挖个坑埋了你 2024-11-09 02:08:53

一般来说,以线程安全的方式提供对全局值的访问的方法是使用内置的 Mutex 类:

$buffer = {}
$bufflock = Mutex.new

threads = (0..2).map do |i|
  Thread.new do
    puts "Starting Thread #{i}"
    3.times do
      puts "Thread #{i} got: #{$buffer[:foo].inspect}"
      $bufflock.synchronize{ $buffer[:foo] = ($buffer[:foo] || 1) * (i+1) }
      sleep rand
    end
    puts "Ending Thread #{i}"
  end
end
threads.each{ |t| t.join } # Wait for all threads to complete

#=> Starting Thread 0
#=> Thread 0 got: nil
#=> Starting Thread 1
#=> Thread 1 got: 1
#=> Starting Thread 2
#=> Thread 2 got: 2
#=> Thread 1 got: 6
#=> Thread 1 got: 12
#=> Ending Thread 1
#=> Thread 0 got: 24
#=> Thread 2 got: 24
#=> Thread 0 got: 72
#=> Thread 2 got: 72
#=> Ending Thread 0
#=> Ending Thread 2

Mutex#synchronize 块对于每个线程都是原子的;在前一个线程处理完该块之前,一个线程无法进入 $bufflock

另请参阅:Pure-Ruby 并发哈希

In general, the way that you provide access to a global value in a thread-safe manner is to use the built-in Mutex class:

$buffer = {}
$bufflock = Mutex.new

threads = (0..2).map do |i|
  Thread.new do
    puts "Starting Thread #{i}"
    3.times do
      puts "Thread #{i} got: #{$buffer[:foo].inspect}"
      $bufflock.synchronize{ $buffer[:foo] = ($buffer[:foo] || 1) * (i+1) }
      sleep rand
    end
    puts "Ending Thread #{i}"
  end
end
threads.each{ |t| t.join } # Wait for all threads to complete

#=> Starting Thread 0
#=> Thread 0 got: nil
#=> Starting Thread 1
#=> Thread 1 got: 1
#=> Starting Thread 2
#=> Thread 2 got: 2
#=> Thread 1 got: 6
#=> Thread 1 got: 12
#=> Ending Thread 1
#=> Thread 0 got: 24
#=> Thread 2 got: 24
#=> Thread 0 got: 72
#=> Thread 2 got: 72
#=> Ending Thread 0
#=> Ending Thread 2

Code inside a Mutex#synchronize block is atomic per thread; one thread cannot go into $bufflock until the previous thread is done with the block.

See also: Pure-Ruby concurrent Hash

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文