使用 ZeroMQ 消息进行线程间通信

发布于 2024-10-27 23:15:39 字数 1227 浏览 2 评论 0原文

我正在尝试使用 zeroMQ 作为在多个线程之间实现消息传递系统的方法。我尝试了下面的代码,但它不起作用;具体而言,每个线程中对 zmq_recv 的调用不会等待/阻止任何消息的执行。

你能帮我解决这段代码吗?

我正在使用 Linux 操作系统和 gcc

最诚挚的问候

AFG

    static void *
    worker_routine (void *context) {
        // Socket to talk to dispatcher
        void *receiver = zmq_socket (context, ZMQ_REP);
        zmq_connect (receiver, "inproc://workers");
        while (1) {

            zmq_msg_t request;
            zmq_msg_init( &request );
            zmq_recv( receiver, &request, 0 );
            printf ("Received request\n");
            // Do some 'work'
            usleep (1000);
            // Send reply back to client
            zmq_send (receiver, &request, 0);
        }
        zmq_close (receiver);
        return NULL;
    }

    int main (void) {

    void *context = zmq_init (1);
    void *clients = zmq_socket (context, ZMQ_REP);
    zmq_bind (clients, "inproc://workers");

    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }

    zmq_close (clients);
    zmq_term (context);
    return 0;
    }

I am trying to use zeroMQ as a way to implement a messaging system between multiple threads. I tried the code below but it doesn't work; in the specific the call to zmq_recv in each thread doesn't wait/block for any message to be executed.

Can you help me with this piece of code?

I am using Linux OS and gcc

Best Regards

AFG

    static void *
    worker_routine (void *context) {
        // Socket to talk to dispatcher
        void *receiver = zmq_socket (context, ZMQ_REP);
        zmq_connect (receiver, "inproc://workers");
        while (1) {

            zmq_msg_t request;
            zmq_msg_init( &request );
            zmq_recv( receiver, &request, 0 );
            printf ("Received request\n");
            // Do some 'work'
            usleep (1000);
            // Send reply back to client
            zmq_send (receiver, &request, 0);
        }
        zmq_close (receiver);
        return NULL;
    }

    int main (void) {

    void *context = zmq_init (1);
    void *clients = zmq_socket (context, ZMQ_REP);
    zmq_bind (clients, "inproc://workers");

    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }

    zmq_close (clients);
    zmq_term (context);
    return 0;
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

不羁少年 2024-11-03 23:15:39

两个套接字都是 REP。你想要的是 REQ + REP。

Both sockets are REP. What you want is REQ + REP.

娇纵 2024-11-03 23:15:39

创建线程后,您将立即关闭套接字和 ZeroMQ。它们可能没有时间达到阻塞状态,如果达到阻塞状态,一旦您销毁 zmq 上下文,它们就会失败。从 zmq_term 手册页

上下文终止按以下步骤执行:

当前在上下文中打开的套接字上正在进行的任何阻塞操作都应立即返回,并返回错误代码 ETERM。

You're shutting down the socket and ZeroMQ right after you're creating the threads. They probably don't have time to reach a blocking state, and if they did, they would fail as soon as you destroy the zmq context. From the zmq_term man page:

Context termination is performed in the following steps:

Any blocking operations currently in progress on sockets open within context shall return immediately with an error code of ETERM.

久隐师 2024-11-03 23:15:39

首先,正如 @sustrik 指出的,您需要使用 REQREP,主线程和工作线程都不能是 REP

其次,您需要在主线程中提供某种阻塞循环:

int main (int argc, char **argv)
{
    void *context = zmq_init (1);
    void *clients = zmq_socket (context, ZMQ_REP); // use ZMQ_REQ on the clients
    zmq_bind (clients, "inproc://workers");

    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }

    while (TRUE)
    {
        // worker thread connected asking for work
        zmq_msg_t request;
        zmq_msg_init (&request);
        zmq_recv (clients, &request, 0);
        zmq_msg_close (&request);

        // do whatever you need to do with the clients' request here

        // send work to clients
        zmq_msg_t reply;
        zmq_msg_init_data (&reply, "Reply", 5, NULL, NULL);
        zmq_send (clients, &reply, 0);
        zmq_msg_close (&reply);
    }

    zmq_close (clients);
    zmq_term (context);
    return 0;
}

Firstly, as @sustrik noted you need to use REQ and REP, both the main thread and the worker threads cannot be REP.

Secondly, you need to provide some sort of blocking loop in your main thread:

int main (int argc, char **argv)
{
    void *context = zmq_init (1);
    void *clients = zmq_socket (context, ZMQ_REP); // use ZMQ_REQ on the clients
    zmq_bind (clients, "inproc://workers");

    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }

    while (TRUE)
    {
        // worker thread connected asking for work
        zmq_msg_t request;
        zmq_msg_init (&request);
        zmq_recv (clients, &request, 0);
        zmq_msg_close (&request);

        // do whatever you need to do with the clients' request here

        // send work to clients
        zmq_msg_t reply;
        zmq_msg_init_data (&reply, "Reply", 5, NULL, NULL);
        zmq_send (clients, &reply, 0);
        zmq_msg_close (&reply);
    }

    zmq_close (clients);
    zmq_term (context);
    return 0;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文