MPI_Get 在父/子上下文中无法正常工作
最近在课堂上,我们一直在学习一种使用 MPI 的新方法,即父/子方法。我们的任务是在 C/C++ 中实现一个非常简单的矩阵/向量乘法,并在集群上实现基准测试。我们使用的是 OpenMPI 4.0.3。
我尝试实现一个“池”系统(孩子们选择一定量的工作,执行它,然后将结果放回主线程,并检查是否还有更多工作要做)。为此,我简单地创建了一个无限循环,子级所做的第一件事就是获取当前的偏移量。虽然偏移低于要处理的向量总数,但它会更新父线程上的偏移,获取向量,处理它们,...
要获取offset,我创建了一个专用的MPI_Win,孩子们可以用它来获取/更新值。问题是,MPI_Get 调用似乎没有更新子线程上偏移 的值。
这是我编写的代码的简化版本(我的代码包含大量日志,将结果写入文件,...)。
parent.cpp:
int main(int argc, char **argv) {
// Init MPI
int pid = -1, nprocs = -1;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs == 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid == 0);
// Read CLI arguments
const unsigned int n = atoi(argv[1]);
const unsigned int m = atoi(argv[2]);
const unsigned int root = atoi(argv[4]);
assert(root < nprocs);
const unsigned int nslave = atoi(argv[5]);
const std::string name = argv[6];
const std::string slave_name = argv[7];
// Define size constants
const size_t nn = n * n;
const size_t mn = m * n;
// Spawning slaves & merging Comm
int intrapid = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_spawn(slave_name.c_str(), argv, nslave,
MPI_INFO_NULL, root, MPI_COMM_WORLD,
&intercom, MPI_ERRCODES_IGNORE);
MPI_Intercomm_merge(intercom, 0, &intracom);
MPI_Comm_rank(intracom, &intrapid);
// Initialize & broadcast matrix
int *matrix = new int[nn];
srand(time(nullptr));
for (size_t i = 0; i < nn; i++) matrix[i] = rand() % MATRIX_MAX;
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// initialize result and offset
int offset = 0;
int *results = new int[mn];
// Initialize and generate vectors
int *vectors = new int[mn];
for (size_t i = 0; i < m; i++) generate_vector(n, vectors + (i * n), rand() % (n / 2));
// Allocate windows
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(vectors, mn, sizeof(int), MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(results, mn, sizeof(int), MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(&offset, 1, sizeof(int), MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
// Start chrono while slaves fetch & compute
Time debut = NOW;
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Write results to file, free memory, finalize
// ...
return EXIT_SUCCESS;
}
child.cpp:
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int pid = -1, intraprid = -1, nprocs = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs >= 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid >= 0 && pid < nprocs);
// Get communicator for intra-process communication through merge
MPI_Comm_get_parent(&intercom);
MPI_Intercomm_merge(intercom, 1, &intracom);
MPI_Comm_rank(intracom, &intraprid);
assert(intraprid >= 0);
// Read CLI arguments
const unsigned int n = atoi(argv[2]);
const unsigned int m = atoi(argv[3]);
const unsigned int batch_sz = atoi(argv[4]);
const unsigned int root = atoi(argv[5]);
assert(root < nprocs);
// Define size constant
const size_t nn = n * n;
// Allocate matrix memory & fetch from master
int *matrix = new int[nn];
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// Allocate batch memory
int *batch = new int[batch_sz * n];
// Initialize dull windows (to match master initialization)
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
int offset = -1, new_offset = -1;
// Infinite loop (break on first condition when no more vectors to process)
while (true) {
// Get offset from master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, offset_win);
MPI_Get(&offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
// If offset is -1, something went wrong with the previous MPI_Get, but MPI_SUCCESS was returned
assert(offset >= 0);
// Break if no more vectors to process
if (new_offset >= m - 1 || offset >= m - 1) {
MPI_Win_unlock(root, offset_win);
break;
}
// Get quantity of vectors to process (if not enough, get all remaining)
const size_t sz = (offset + batch_sz > m) ? m - offset : batch_sz;
// if sz > batch_sz, the received buffer will be overflown
assert(sz <= batch_sz);
// Compute the new vector offset for the other slaves
new_offset = offset + sz;
// Update the offset on master
MPI_Put(&new_offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
MPI_Win_unlock(root, offset_win);
// Fetch the batch of vectors to process
MPI_Win_lock(MPI_LOCK_SHARED, root, 0, vectors_win);
MPI_Get(batch, sz * n, MPI_INT, root, offset * n, sz * n, MPI_INT, vectors_win);
MPI_Win_unlock(root, vectors_win);
// Process the batch
for (size_t i = 0; i < sz; ++i) {
// ... matrix multiplication
}
// Put the result in the results window of the master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, results_win);
MPI_Put(&batch, sz * n, MPI_INT, root, offset, sz * n, MPI_INT, results_win);
MPI_Win_unlock(root, results_win);
}
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Free memory, finalize
// ...
return EXIT_SUCCESS;
}
问题在于子 while 循环开头的断言 assert(offset >= 0)
被触发(日志显示 offset em> 仍然是 -1,或者它初始化时使用的任何值)。鉴于父线程上的偏移量从 0 开始,这意味着变量未更新,但对 MPI_Get 的调用返回了 MPI_SUCCESS。我想到了并发问题,但似乎锁工作正常,因为孩子们在进入锁之前等待前一个锁崩溃。
我试图解决这个问题,但由于缺乏明确的文档,我没有成功。我要么犯了一个我没有注意到的愚蠢的打字错误,要么我不知道这种方法有一些具体的东西。
如果有人知道我做错了什么,我很乐意接受。请原谅我的任何英语错误,我很累。
编辑:根据要求,我将名称更改为“父母/儿童”,而不是旧术语
编辑2:正如所指出的,这里的栅栏没有用。我知道这一点,它们只是来自我用作样板代码的先前版本的代码。
Recently in class, we've been learning about a new way to use MPI, with the Parent/Children approach. We were tasked with implementing a really simple matrix/vector multiplication in C/C++, and realise benchmarks on a cluster. We're using OpenMPI 4.0.3.
I tried to implement a "pooling" system (Children pick a certain amount of work, do it, then put the result back on the master thread, and check if there's more work to do). To do so, I simply created a infinite loop, and the first thing a child does, is to fetch the current offset. While the offset is lower than the total number of vectors to process, it updates the offset on the parent thread, fetch the vectors, process them, ...
To fetch the offset, I created a dedicated MPI_Win, that children can use to fetch/update the value. The thing is, the MPI_Get call dosen't seem to update the value of the offset on the children threads.
Here are simplified versions of the code I wrote (mine contains a lot of logs, write results to a file, ...).
parent.cpp:
int main(int argc, char **argv) {
// Init MPI
int pid = -1, nprocs = -1;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs == 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid == 0);
// Read CLI arguments
const unsigned int n = atoi(argv[1]);
const unsigned int m = atoi(argv[2]);
const unsigned int root = atoi(argv[4]);
assert(root < nprocs);
const unsigned int nslave = atoi(argv[5]);
const std::string name = argv[6];
const std::string slave_name = argv[7];
// Define size constants
const size_t nn = n * n;
const size_t mn = m * n;
// Spawning slaves & merging Comm
int intrapid = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_spawn(slave_name.c_str(), argv, nslave,
MPI_INFO_NULL, root, MPI_COMM_WORLD,
&intercom, MPI_ERRCODES_IGNORE);
MPI_Intercomm_merge(intercom, 0, &intracom);
MPI_Comm_rank(intracom, &intrapid);
// Initialize & broadcast matrix
int *matrix = new int[nn];
srand(time(nullptr));
for (size_t i = 0; i < nn; i++) matrix[i] = rand() % MATRIX_MAX;
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// initialize result and offset
int offset = 0;
int *results = new int[mn];
// Initialize and generate vectors
int *vectors = new int[mn];
for (size_t i = 0; i < m; i++) generate_vector(n, vectors + (i * n), rand() % (n / 2));
// Allocate windows
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(vectors, mn, sizeof(int), MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(results, mn, sizeof(int), MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(&offset, 1, sizeof(int), MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
// Start chrono while slaves fetch & compute
Time debut = NOW;
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Write results to file, free memory, finalize
// ...
return EXIT_SUCCESS;
}
child.cpp:
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int pid = -1, intraprid = -1, nprocs = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs >= 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid >= 0 && pid < nprocs);
// Get communicator for intra-process communication through merge
MPI_Comm_get_parent(&intercom);
MPI_Intercomm_merge(intercom, 1, &intracom);
MPI_Comm_rank(intracom, &intraprid);
assert(intraprid >= 0);
// Read CLI arguments
const unsigned int n = atoi(argv[2]);
const unsigned int m = atoi(argv[3]);
const unsigned int batch_sz = atoi(argv[4]);
const unsigned int root = atoi(argv[5]);
assert(root < nprocs);
// Define size constant
const size_t nn = n * n;
// Allocate matrix memory & fetch from master
int *matrix = new int[nn];
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// Allocate batch memory
int *batch = new int[batch_sz * n];
// Initialize dull windows (to match master initialization)
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
int offset = -1, new_offset = -1;
// Infinite loop (break on first condition when no more vectors to process)
while (true) {
// Get offset from master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, offset_win);
MPI_Get(&offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
// If offset is -1, something went wrong with the previous MPI_Get, but MPI_SUCCESS was returned
assert(offset >= 0);
// Break if no more vectors to process
if (new_offset >= m - 1 || offset >= m - 1) {
MPI_Win_unlock(root, offset_win);
break;
}
// Get quantity of vectors to process (if not enough, get all remaining)
const size_t sz = (offset + batch_sz > m) ? m - offset : batch_sz;
// if sz > batch_sz, the received buffer will be overflown
assert(sz <= batch_sz);
// Compute the new vector offset for the other slaves
new_offset = offset + sz;
// Update the offset on master
MPI_Put(&new_offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
MPI_Win_unlock(root, offset_win);
// Fetch the batch of vectors to process
MPI_Win_lock(MPI_LOCK_SHARED, root, 0, vectors_win);
MPI_Get(batch, sz * n, MPI_INT, root, offset * n, sz * n, MPI_INT, vectors_win);
MPI_Win_unlock(root, vectors_win);
// Process the batch
for (size_t i = 0; i < sz; ++i) {
// ... matrix multiplication
}
// Put the result in the results window of the master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, results_win);
MPI_Put(&batch, sz * n, MPI_INT, root, offset, sz * n, MPI_INT, results_win);
MPI_Win_unlock(root, results_win);
}
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Free memory, finalize
// ...
return EXIT_SUCCESS;
}
The problem is that the assertion assert(offset >= 0)
at the beginning of the child while loop is triggered (and logs show that offset is still -1, or whatever it was initialised with). Given that the offset starts at 0 on the parent thread, it means that the variable was not updated, but the call to MPI_Get returned MPI_SUCCESS. I though about a concurrency problem, but it seems that the lock works fine, as the children wait for the previous one to crash before entering the lock.
I've tried to resolve the problem, but given the lack of a clear documentation, I didn't succeed. I either made a stupid typo that I didn't catch, or there's something specific about this approach that I'm not aware of.
If someone has an idea about what I did wrong, I'd gladly accept it. Please excuse me for any english mistakes, I'm quite tired.
Edit: As requested, I switched names to "Parent/Children", instead of the old terminology
Edit 2: As it's been pointed out, the fences here are useless. I'm aware of this, they just come from the previous version of the code that I used as boilerplate code.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您的大问题是您立即使用通过
MPI_Get
检索到的变量。按照你的方式这是不可能的。该变量仅在释放锁或执行同步调用后才具有其值。因为您有条件地释放锁,所以我会在MPI_Get
调用之后插入MPI_Win_flush_local
,以确保目标和源结果之间的一致性。编辑。另一件事是您混合了主动(
fence
)和被动(lock
)目标同步。在您的代码中,栅栏不会执行任何操作,因此请将其删除。如果所有子进程执行相同数量的获取或放置,则栅栏将是合适的:那么纪元的关闭栅栏将确保源/目标上数据的一致性。Your big problem is that you immediately use the variable you retrieve with
MPI_Get
. That is not possible the way you do it. This variable only has its value after you release the lock, or after you do a synchronization call. Because you release the lock conditionally, I would insertMPI_Win_flush_local
after theMPI_Get
call, to ensure coherence between results on the target and origin.EDIT. Another thing is that you mix active (
fence
) and passive (lock
) target synchronization. In your code the fence doesn't do anything, so remove it. A fence would be appropriate if all children do the same number of gets or puts: then the closing fence of the epoch would ensure coherence of data on origin/target.