“无锁”与线程池结合使用一次写入、经常读取的值
我面临以下问题:
我的程序使用“任务”的概念,其中存在共同构成关键区域的代码(函数)的任务。每个任务(对象)都知道它的“密钥”是什么,但该密钥可能与其他任务共享。 “键”(指向另一个资源的指针——下面称为 B)在任务的生命周期内不会改变。然而,在我创建第二个资源之前它必须存在。
为了澄清,从任务的角度来看:
- 创建资源 A(如果尚不存在)。
- 一旦我们有了 A,就创建资源 B(A)(如果该资源尚不存在)。
其中 B 是 A 的函数:每个 A 只能创建一个 B。 存在多个这样的任务,有些任务可能会创建不同的 A,或者看到 他们的 A 已经存在并与其他任务使用相同的 A。
因此,B 的创建需要自己的互斥体:所有任务并行运行,但只应创建一个 B(每个 A)。
但是,一旦创建了 B,我就不想锁定该互斥体 每次 - 以防万一它仍在创建中。创建仅发生一次,并且在程序的其余执行过程中会经常发生读取。
因此,程序流程如下:
class Application {
std::map<A*, B> all_B;
std::mutex all_B_mutex;
B* B_broker(A*);
};
struct A {
std::atomic<B*> cached_B_ptr;
};
每个任务一旦获得 A,就会调用
B_broker(A)
- 一次。 该函数将锁定互斥锁并创建 B, 如果还不存在,则返回指向新 B 的指针。- 。
从
B_broker(A)
获取非 NULL 指针的线程存储此B* 在 A 的成员变量中:
A::cached_B_ptr
。 如果 B(A) 已存在,则
B_broker(A)
将返回 nullptr。 然后调用线程等待,直到看到A::cached_B_ptr
是非空的。对
A::cached_B_ptr
的所有访问都是通过memory_order_relaxed
进行的,因此与仅读取没有锁的普通变量没有区别(因此“无锁”标题)。在程序正常运行期间,仅读取该变量,因此不存在争用。
尽管给定任务在其自己的临界区域内运行,但不同的任务可能共享相同的 A,因此对 A::cached_B_ptr 的访问不受互斥体(或者无论如何相同的互斥体)的保护。
现在的问题是:这能安全地完成吗? 其中,
- 线程 A 写入
A::cached_B_ptr
(宽松) - 没有相关的互斥体。 - 线程 B 运行一个与 A 无关的任务,并且想要使用
A::cached_B_ptr
。 B 未写入A::cached_B_ptr
(B_broker(A)
返回nullptr
),但由于A::cached_B_ptr
> 不受互斥锁保护,也不能依赖已设置的互斥锁;所以 B 会等待,直到读取到一个非空值。 - 线程C连续运行B的任务,也想使用A::cached_B_ptr,但这次没有任何检查或等待。 B 所做的是每个任务一次的操作,仅在任务启动时执行,并且仅在程序启动后不久执行(此时仍可能与 A 写入
A::cached_B_ptr< /代码>)。
PS以下两个问题可能相关:
I am facing the following problem:
My program uses the concept of "tasks", where a task exists of code (functions) that together make up a critical area. Each task (object) knows what its "key" is, but that key might be shared with other tasks. The "key" (a pointer to yet another resource-- called B below) does not change for the lifetime of the task. Yet, it has to exist before I can create the second resource.
To clarify, from the point of the view of a task:
- Create resource A, if that does not already exist.
- Once we have A, create resource B(A), if that does not already exist.
where B is a function of A: only one B should be created per A.
Multiple such tasks exist and some might create different A's, or see
that their A already exists and use the same A as other tasks.
The creation of B hence requires its own mutex: all tasks run in parallel, but only one B (per A) should be created.
Once B is created however, I do not want to have to lock that mutex
every time - just in case it is still being created. The creation only happens once and reading will happen very often the rest of the execution of the program.
The program flow is therefore as follows:
class Application {
std::map<A*, B> all_B;
std::mutex all_B_mutex;
B* B_broker(A*);
};
struct A {
std::atomic<B*> cached_B_ptr;
};
Each task, as soon as it has A, calls
B_broker(A)
- once.
This function will lock a mutex and create B,
if that doesn't exist yet, and return a pointer to that new B.The thread that gets a non-NULL pointer from
B_broker(A)
stores thisB*
in a member variable of A:A::cached_B_ptr
.If B(A) already exists then
B_broker(A)
will return nullptr.
The calling thread then waits until it sees thatA::cached_B_ptr
is non-null.All accesses to
A::cached_B_ptr
are withmemory_order_relaxed
, so that there is no difference with just reading normal variables without a lock (hence the "lock free" in the title). During normal operation of the program this variable is only being read, so there is no contention.
Although a given task runs inside its own critical area, different tasks might share the same A, so access to A::cached_B_ptr
is not protected by a mutex (or, the same mutex anyway).
The question now is: can this be done safely?
Where,
- thread A writes to
A::cached_B_ptr
(relaxed) - no related mutexes. - thread B runs a task, unrelated to A, and wants to use
A::cached_B_ptr
. B does not write toA::cached_B_ptr
(B_broker(A)
returnednullptr
) but sinceA::cached_B_ptr
is not protected by a mutex is also can't rely on it already being set; so B waits until it reads a non-null value. - thread C continuous to run the task of B and also wants to use
A::cached_B_ptr
, but this time without any checks or waiting. What B did is a once-per-task operation that is only done when a task is started, and that only shortly after the program was started (when there is still a possible race with A writing toA::cached_B_ptr
).
PS The follow two questions might be related:
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我使用 genmc 使用以下程序对此进行了测试:
请注意,genmc 仅适用于 C 代码;但它使用的内存模型与C++相同。它在 LLVM 中间表示级别验证内存模型所基于的虚拟机的可能执行,并且语法基于 pthreads 应该不重要。
测试中的代码片段运行一个线程 A,该线程将一些值写入
x
,这对应于我们的std::atomic A::cached_B_ptr
。这是一个轻松的写入,没有任何锁定。
然后一个独立的线程B读取
x
,并循环直到读取到一个非NULL值。或者,我们读取一次并告诉 genmc 我们假设它读取 A 写入的值:线程 C 扮演运行 B 完成初始化后启动的任务的角色(调用
>B_broker(A)
看到它返回nullptr
,然后读取A::cached_B_ptr
直到返回非 NULL 值)。因此,我们只对 C 在 B 之后运行的代码片段的执行感兴趣。这是通过在线程 C 中将原子order
设置为 1 来实现的,并验证/假设它在线程 C 中仍然为 0。线程 B:最后,在线程 C 中,我们读取
x
来验证它始终与 A 编写的内容同步,即使与线程 A 绝对没有同步 - 并且所有对x
的访问是宽松的。这是通过读取 C 语言中的x
并将其写入全局变量 (c
) 以供以后检查来完成的。之后order
设置为1。程序的输出是:
换句话说,在B在C之前运行的设定限制内,B看到A写入的值,C也总是 看到相同的值。如果不是这种情况,
c = 0
也会被打印出来,并且完整执行的次数将大于 1。I tested this using genmc using the following program:
Note that genmc is for C code only; but the memory model that it uses is the same as that of C++. It verifies the possible executions of the virtual machine upon which the memory model is based at the level of LLVM Intermediate Representation and it should not matter that the syntax is based on pthreads.
The snippet under test run a thread A that writes some value to
x
, which corresponds to ourstd::atomic<B*> A::cached_B_ptr
.This is a relaxed write without any locking.
Then an independent thread B reads
x
, and loops until it reads a non-NULL value. Or, instead we read it once and tellgenmc
that we assume it read the value that A writes:A thread C plays the role of running the task that B started after B finished this initialization (calling
B_broker(A)
seeing it returnednullptr
and then readingA::cached_B_ptr
until that returned a non-NULL value). Hence we are only interested in executions of this snippet where C runs after B. This is achieved by having an atomicorder
that is set to 1 in thread C, and verify/assume that it is still 0 in thread B:Finally, in thread C we read
x
to verify that it always is synced with what A wrote, even though there is absolutely no synchronization with thread A - and all accesses tox
are relaxed. This is done by readingx
in C and writing it to a global variable (c
) for later inspection. After thatorder
is set to 1.The output of the program is:
In other words, within the set restrictions that B ran before C and B sees the value written by A, also C always sees that same value. If that was not the case,
c = 0
would also have been printed and the number of complete executions would have been larger than 1.