简单 MPI 发送/接收程序中的行为不明确

发布于 2024-12-11 11:08:46 字数 4184 浏览 0 评论 0原文

我的代码中出现错误已经有一段时间了,但还不知道如何解决它。

我想要实现的目标很简单:每个工作节点(即等级!=0的节点)在涉及一些计算的方形结构中获取一行(由一维数组表示)。计算完成后,该行将被发送回主服务器。

出于测试目的,不涉及任何计算。所发生的事情是:

  • master 将行号发送给worker,worker 使用行号来计算相应的值
  • worker 将带有结果值的数组发送回来

现在,我的问题是:

  • 所有工作都按预期进行,直到数字达到一定大小一行中的元素数(大小 = 1006)和工作人员数量 > 1
  • 如果一行中的元素超过 1006,工作线程将无法关闭,并且程序不会终止,
  • 只有当我尝试将数组发送回主节点时才会发生这种情况。如果我只是发回一个 INT,那么一切都可以(请参阅 doMasterTasks() 和 doWorkerTasks() 中的注释行)。

根据最后一个要点,我假设必须存在一些竞争条件,该条件仅在数组为达到一定大小后发送回master。

您知道问题出在哪里吗?

使用以下命令编译以下代码: mpicc -O2 -std=c99 -o simple

运行可执行文件,如下所示: mpirun -np 3 simple; (例如 1006 或 1007)

以下是代码:

#include "mpi.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define MASTER_RANK 0
#define TAG_RESULT 1
#define TAG_ROW 2
#define TAG_FINISHOFF 3

int mpi_call_result, my_rank, dimension, np;

// forward declarations
void doInitWork(int argc, char **argv);
void doMasterTasks(int argc, char **argv);
void doWorkerTasks(void);
void finalize();
void quit(const char *msg, int mpi_call_result);

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

void doMasterTasks(int argc, char **argv) {
    printf("Starting to distribute work...\n");
    int size = dimension;
    int * dataBuffer = (int *) malloc(sizeof(int) * size);

    int currentRow = 0;
    int receivedRow = -1;
    int rowsLeft = dimension;
    MPI_Status status;

    for (int i = 1; i < np; i++) {
        MPI_Send(&currentRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;

    }

    for (;;) {
//        MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
        MPI_Recv(&receivedRow, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

        if (rowsLeft == 0)
            break;

        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(&currentRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;
    }
    shutdownWorkers();
    free(dataBuffer);
}

void doWorkerTasks() {
    printf("Worker %d started\n", my_rank);

    // send the processed row back as the first element in the colours array.
    int size = dimension;
    int * data = (int *) malloc(sizeof(int) * size);
    memset(data, 0, sizeof(size));

    int processingRow = -1;
    MPI_Status status;

    for (;;) {

        MPI_Recv(&processingRow, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
        if (status.MPI_TAG == TAG_FINISHOFF) {
            printf("Finish-OFF tag received!\n");
            break;
        } else {
//            MPI_Send(data, size, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
            MPI_Send(&processingRow, 1, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
        }
    }

    printf("Slave %d finished work\n", my_rank);
    free(data);
}

int main(int argc, char **argv) {


    if (argc == 2) {
        sscanf(argv[1], "%d", &dimension);
    } else {
        dimension = 1000;
    }

    doInitWork(argc, argv);

    if (my_rank == MASTER_RANK) {
        doMasterTasks(argc, argv);
    } else {
        doWorkerTasks();
    }
    finalize();
}

void quit(const char *msg, int mpi_call_result) {
    printf("\n%s\n", msg);
    MPI_Abort(MPI_COMM_WORLD, mpi_call_result);
    exit(mpi_call_result);
}

void finalize() {
    mpi_call_result = MPI_Finalize();
    if (mpi_call_result != 0) {
        quit("Finalizing the MPI system failed, aborting now...", mpi_call_result);
    }
}

void doInitWork(int argc, char **argv) {
    mpi_call_result = MPI_Init(&argc, &argv);
    if (mpi_call_result != 0) {
        quit("Error while initializing the system. Aborting now...\n", mpi_call_result);
    }
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
}

非常感谢任何帮助!

最好的, 克里斯

I've been having a bug in my code for some time and could not figure out yet how to solve it.

What I'm trying to achieve is easy enough: every worker-node (i.e. node with rank!=0) gets a row (represented by 1-dimensional arry) in a square-structure that involves some computation. Once the computation is done, this row gets sent back to the master.

For testing purposes, there is no computation involved. All that's happening is:

  • master sends row number to worker, worker uses the row number to calculate the according values
  • worker sends the array with the result values back

Now, my issue is this:

  • all works as expected up to a certain size for the number of elements in a row (size = 1006) and number of workers > 1
  • if the elements in a row exceed 1006, workers fail to shutdown and the program does not terminate
  • this only occurs if I try to send the array back to the master. If I simply send back an INT, then everything is OK (see commented out line in doMasterTasks() and doWorkerTasks())

Based on the last bullet point, I assume that there must be some race-condition which only surfaces when the array to be sent back to the master reaches a certain size.

Do you have any idea what the issue could be?

Compile the following code with: mpicc -O2 -std=c99 -o simple

Run the executable like so: mpirun -np 3 simple <size> (e.g. 1006 or 1007)

Here's the code:

#include "mpi.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define MASTER_RANK 0
#define TAG_RESULT 1
#define TAG_ROW 2
#define TAG_FINISHOFF 3

int mpi_call_result, my_rank, dimension, np;

// forward declarations
void doInitWork(int argc, char **argv);
void doMasterTasks(int argc, char **argv);
void doWorkerTasks(void);
void finalize();
void quit(const char *msg, int mpi_call_result);

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

void doMasterTasks(int argc, char **argv) {
    printf("Starting to distribute work...\n");
    int size = dimension;
    int * dataBuffer = (int *) malloc(sizeof(int) * size);

    int currentRow = 0;
    int receivedRow = -1;
    int rowsLeft = dimension;
    MPI_Status status;

    for (int i = 1; i < np; i++) {
        MPI_Send(¤tRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;

    }

    for (;;) {
//        MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
        MPI_Recv(&receivedRow, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

        if (rowsLeft == 0)
            break;

        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(¤tRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;
    }
    shutdownWorkers();
    free(dataBuffer);
}

void doWorkerTasks() {
    printf("Worker %d started\n", my_rank);

    // send the processed row back as the first element in the colours array.
    int size = dimension;
    int * data = (int *) malloc(sizeof(int) * size);
    memset(data, 0, sizeof(size));

    int processingRow = -1;
    MPI_Status status;

    for (;;) {

        MPI_Recv(&processingRow, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
        if (status.MPI_TAG == TAG_FINISHOFF) {
            printf("Finish-OFF tag received!\n");
            break;
        } else {
//            MPI_Send(data, size, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
            MPI_Send(&processingRow, 1, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
        }
    }

    printf("Slave %d finished work\n", my_rank);
    free(data);
}

int main(int argc, char **argv) {


    if (argc == 2) {
        sscanf(argv[1], "%d", &dimension);
    } else {
        dimension = 1000;
    }

    doInitWork(argc, argv);

    if (my_rank == MASTER_RANK) {
        doMasterTasks(argc, argv);
    } else {
        doWorkerTasks();
    }
    finalize();
}

void quit(const char *msg, int mpi_call_result) {
    printf("\n%s\n", msg);
    MPI_Abort(MPI_COMM_WORLD, mpi_call_result);
    exit(mpi_call_result);
}

void finalize() {
    mpi_call_result = MPI_Finalize();
    if (mpi_call_result != 0) {
        quit("Finalizing the MPI system failed, aborting now...", mpi_call_result);
    }
}

void doInitWork(int argc, char **argv) {
    mpi_call_result = MPI_Init(&argc, &argv);
    if (mpi_call_result != 0) {
        quit("Error while initializing the system. Aborting now...\n", mpi_call_result);
    }
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
}

Any help is greatly appreciated!

Best,
Chris

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

青瓷清茶倾城歌 2024-12-18 11:08:46

如果您查看 doWorkerTasks,您会发现它们发送的数据消息数量与接收的数据消息数量完全相同; (他们会收到另一个来关闭他们)。

但是你的主代码:

for (int i = 1; i < np; i++) {
    MPI_Send(¤tRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;

}

for (;;) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);

    if (rowsLeft == 0)
        break;

    MPI_Send(¤tRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;
}

发送的数据消息比接收的多 np-2 倍。特别是,它只会持续接收数据直到不再需要发送,即使应该还有 np-2 个未完成的数据消息。将代码更改为以下内容:

int rowsLeftToSend= dimension;
int rowsLeftToReceive = dimension;

for (int i = 1; i < np; i++) {
    MPI_Send(¤tRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeftToSend--;
    currentRow++;

}

while (rowsLeftToReceive > 0) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
    rowsLeftToReceive--;

    if (rowsLeftToSend> 0) {
        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(¤tRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeftToSend--;
        currentRow++;
    }
}

现在可以了。

为什么对于较小的消息大小,代码不会死锁(请注意,这是死锁,而不是竞争条件;这是分布式计算中更常见的并行错误),这是大多数 MPI 实现如何工作的微妙细节。一般来说,MPI 实现只是将小消息“推”到管道中,无论接收者是否准备好接收它们,但较大的消息(因为它们在接收端占用更多存储资源)需要发送者和接收者之间进行一些握手。 (如果您想了解更多信息,请搜索 eager 与 rendezvous 协议)。

因此,对于小消息情况(在这种情况下少于 1006 个整数,并且 1 个整数也肯定有效),无论主节点是否正在接收它们,工作节点都会发送它们。如果主设备调用了 MPI_Recv(),则消息已经存在并且会立即返回。但事实并非如此,所以主端有待处理的消息;但这并不重要。主人发出了杀戮信息,所有人都退了出去。

但对于较大的消息,剩余的 send() 必须让接收者参与清除,并且由于接收者从不参与清除,因此剩余的工作线程会挂起。

请注意,即使对于没有死锁的小消息情况,代码也无法正常工作 - 缺少计算数据。

更新:您的 shutdownWorkers 中也存在类似问题:

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

您正在向所有进程发送数据,包括排名 0 的进程,即执行发送的进程。原则上,MPI_Send 应该死锁,因为它是阻塞发送并且没有已发布的匹配接收。您可以之前发布一个非阻塞接收来避免这种情况,但这是不必要的——等级 0 不需要让自己知道结束。因此,只需将循环更改为

    for (int i = 1; i < np; i++)

tl;dr - 您的代码陷入僵局,因为主服务器没有从工作线程接收到足够的消息;由于大多数 MPI 库共有的实现细节,它恰好适用于小消息大小。

If you take a look at your doWorkerTasks, you see that they send exactly as many data messages as they receive; (and they receive one more to shut them down).

But your master code:

for (int i = 1; i < np; i++) {
    MPI_Send(¤tRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;

}

for (;;) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);

    if (rowsLeft == 0)
        break;

    MPI_Send(¤tRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;
}

sends np-2 more data messages than it receives. In particular, it only keeps receiving data until it has no more to send, even though there should be np-2 more data messages outstanding. Changing the code to the following:

int rowsLeftToSend= dimension;
int rowsLeftToReceive = dimension;

for (int i = 1; i < np; i++) {
    MPI_Send(¤tRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeftToSend--;
    currentRow++;

}

while (rowsLeftToReceive > 0) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
    rowsLeftToReceive--;

    if (rowsLeftToSend> 0) {
        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(¤tRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeftToSend--;
        currentRow++;
    }
}

Now works.

Why the code doesn't deadlock (note this is deadlock, not a race condition; this is a more common parallel error in distributed computing) for smaller message sizes is a subtle detail of how most MPI implementations work. Generally, MPI implementations just "shove" small messages down the pipe whether or not the receiver is ready for them, but larger messages (since they take more storage resources on the receiving end) need some handshaking between the sender and the receiver. (If you want to find out more, search for eager vs rendezvous protocols).

So for the small message case (less than 1006 ints in this case, and 1 int definitely works, too) the worker nodes did their send whether or not the master was receiving them. If the master had called MPI_Recv(), the messages would have been there already and it would have returned immediately. But it didn't, so there were pending messages on the master side; but it didn't matter. The master sent out its kill messages, and everyone exited.

But for larger messages, the remaining send()s have to have the receiver particpating to clear, and since the receiver never does, the remaining workers hang.

Note that even for the small message case where there was no deadlock, the code didn't work properly - there was missing computed data.

Update: There was a similar problem in your shutdownWorkers:

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

Here you are sending to all processes, including rank 0, the one doing the sending. In principle, that MPI_Send should deadlock, as it is a blocking send and there isn't a matching receive already posted. You could post a non-blocking receive before to avoid this, but that's unnecessary -- rank 0 doesn't need to let itself know to end. So just change the loop to

    for (int i = 1; i < np; i++)

tl;dr - your code deadlocked because the master wasn't receiving enough messages from the workers; it happened to work for small message sizes because of an implementation detail common to most MPI libraries.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文