mq_notify 不通知事件(Linux 编程)

发布于 2024-10-06 02:36:50 字数 861 浏览 0 评论 0原文

我正在使用 mq_notify 来获取有关消息队列上的事件的通知,但我注册的通知程序函数没有被调用。我错过了什么吗?

我将我的代码片段粘贴在下面:


static void sigNotifier(union sigval sv)
{
   printf ("I'm called.\n");
}    

int main()
{
   mqd_t queueID = 0;
   message_t msg;
   int retval;
   struct mq_attr attr;
   struct sigevent sev;

   attr.mq_msgsize = MSG_SIZE;
   attr.mq_maxmsg = 30;

   errno = 0;

   queueID = mq_open(MSG_QUEUE_NAME, O_RDONLY, 0666, &attr);

   if (queueID == -1) {
      printf ("Message queue open failed: %d\n", errno);
   }

   sev.sigev_notify = SIGEV_THREAD;
   sev.sigev_notify_function = sigNotifier;
   sev.sigev_notify_attributes = NULL;
   sev.sigev_value.sival_ptr = &queueID;

   retval = mq_notify(queueID, &sev);
   if (retval < 0) {
      printf ("Notification failed: %d\n", errno);
   }

   while (1);    
}

I'm using mq_notify to be notified about events on a message queue, but my registered notifier function is not being called. Did I miss something?

I'm pasting my code snippet below:


static void sigNotifier(union sigval sv)
{
   printf ("I'm called.\n");
}    

int main()
{
   mqd_t queueID = 0;
   message_t msg;
   int retval;
   struct mq_attr attr;
   struct sigevent sev;

   attr.mq_msgsize = MSG_SIZE;
   attr.mq_maxmsg = 30;

   errno = 0;

   queueID = mq_open(MSG_QUEUE_NAME, O_RDONLY, 0666, &attr);

   if (queueID == -1) {
      printf ("Message queue open failed: %d\n", errno);
   }

   sev.sigev_notify = SIGEV_THREAD;
   sev.sigev_notify_function = sigNotifier;
   sev.sigev_notify_attributes = NULL;
   sev.sigev_value.sival_ptr = &queueID;

   retval = mq_notify(queueID, &sev);
   if (retval < 0) {
      printf ("Notification failed: %d\n", errno);
   }

   while (1);    
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

梦里梦着梦中梦 2024-10-13 02:36:50

手册页mq_notify

1) 仅当新消息到达并且队列先前为空时才会发生消息通知。如果调用 mq_notify() 时队列不为空,则仅在队列清空且新消息到达后才会发生通知。

2)通知发生一次:通知下发后,通知注册被移除,其他进程可以注册消息通知。如果被通知的进程希望接收下一个通知,它可以使用 mq_notify() 来请求进一步的通知。

3) 一个进程只能注册一个来接收来自消息队列的通知。

因此:

1) 在使用 mq_notify() 注册后,立即清空 MsgQ 以在 reader 进程中接收新消息。

2) 重新注册notify函数以接收下一条消息。

3) 仅注册 1 个进程用于从 Q 接收消息。

这是一个简单的 C++ 消息队列读取器代码:

#include <iostream>
#include <mqueue.h>
#include <string.h>
#include <sstream>
#include <unistd.h>
#include <errno.h>

using namespace std;

#define MSG_Q_NAME "/MY_MSGQ_3"

static void                     /* Thread start function */
tfunc(union sigval sv)
{
  mqd_t msq_id = *(static_cast<mqd_t*>(sv.sival_ptr));

  struct mq_attr attr;
  if(mq_getattr(msq_id, &attr) < 0)
  {
    cout << "Error in mq_getattr " << strerror(errno)  << endl;
    return;
  }

  // Reregister for new messages on Q
  struct sigevent sev;
  sev.sigev_notify = SIGEV_THREAD;
  sev.sigev_notify_function = tfunc;
  sev.sigev_notify_attributes = NULL;
  sev.sigev_value.sival_ptr = sv.sival_ptr;
  if (mq_notify(msq_id, &sev) < 0)
  {
    cout << "Error during Reregister in msq_notify : "
         << strerror(errno) << endl;
    exit(EXIT_FAILURE);
  }

  // Read new message on the Q
  char* arr = new char[attr.mq_msgsize];
  memset(arr, 0, attr.mq_msgsize);
  if(mq_receive(msq_id, arr, attr.mq_msgsize, 0) < 0)
  {
    if(errno != EAGAIN)
    {
      cout << "Error in mq_receive " << strerror(errno) << endl;
      exit(EXIT_FAILURE);
    }
  }
  else
  {
    cout << "Msg rcvd " << arr << endl;
  }
}

int main()
{
  mqd_t msq_id = mq_open(MSG_Q_NAME, O_RDONLY | O_NONBLOCK | O_CREAT, 0666, 0);
  if(msq_id == (mqd_t) -1)
  {
    cout << "Error on msg Q creation: " << strerror(errno) << endl;
    exit(EXIT_FAILURE);
  }

  // The process is registered for notification for new message on the Q
  struct sigevent sev;
  sev.sigev_notify = SIGEV_THREAD;
  sev.sigev_notify_function = tfunc;
  sev.sigev_notify_attributes = NULL;
  sev.sigev_value.sival_ptr = &msq_id;

  if (mq_notify(msq_id, &sev) < 0)
  {
    cout << "Error on msg Q notify : " << strerror(errno) << endl;
    exit(EXIT_FAILURE);
  }
  else
  {
    cout << "Notify for msg Q reception " << MSG_Q_NAME << endl;
  }

  // Man Page mq_notify: Message notification occurs only when a new
  // message arrives and the queue was previously empty. If the queue was
  // not empty at the time mq_notify() was called, then a notification will
  // occur only after the queue is emptied and a new message arrives.
  //
  // So emptying the Q to recv new messages
  ssize_t n = 0;
  struct mq_attr attr;
  if(mq_getattr(msq_id, &attr) < 0)
  {
    cout << "Error in mq_getattr " << strerror(errno)  << endl;
    exit(EXIT_FAILURE);
  }
  char* arr = new char[attr.mq_msgsize];
  memset(arr, 0, attr.mq_msgsize);
  while((n = mq_receive(msq_id, arr, attr.mq_msgsize, 0) >= 0))
  {
    cout << "Empty the Q. Msg rcvd " << arr << endl;
  }

  while(1)
    ;

  mq_close(msq_id);
}

这是一个简单的消息 Q 写入器代码:

#include <iostream>
#include <mqueue.h>
#include <string.h>
#include <sstream>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h> 

using namespace std;

#define MSG_Q_NAME "/MY_MSGQ_3"

int main()
{
  struct mq_attr attr;
  memset(&attr, 0, sizeof attr);
  attr.mq_msgsize = 8192;
  attr.mq_flags = 0;
  attr.mq_maxmsg = 10;

  mqd_t msq_id = mq_open(MSG_Q_NAME, O_RDWR | O_CREAT | O_NONBLOCK,
                         0777, &attr);
  if(msq_id == (mqd_t) -1)
  {
    cout << "Error on msg Q creation: " << strerror(errno) << endl;
    exit(1);
  }

  // Write 5 msgs on message Q
  for(int i = 0; i < 5; ++i)
  {
    stringstream s;
    s << "My Msg " << i;

    if(mq_send(msq_id, s.str().c_str(), strlen(s.str().c_str()), 0) < 0)
    {
      if(errno != EAGAIN)
      {
        cout << "Error on sending msg on MsgQ " << strerror(errno);
        mq_close(msq_id);
        exit(1);
      }
    }
    else
    {
      cout << "Sent msg " << s.str() << endl;
    }

    sleep(1); // Easily see the received message in reader
  }

  mq_close(msq_id);
}

Man Page mq_notify:

1) Message notification occurs only when a new message arrives and the queue was previously empty. If the queue was not empty at the time mq_notify() was called, then a notification will occur only after the queue is emptied and a new message arrives.

2) Notification occurs once: after a notification is delivered, the notification registration is removed, and another process can register for message notification. If the notified process wishes to receive the next notification, it can use mq_notify() to request a further notification.

3) Only one process can be registered to receive notification from a message queue.

So:

1) Empty the MsgQ to recv new messages in the reader process immediately after registering with mq_notify().

2) Re-register in the notify function for receiving next message.

3) Register only 1 process for receiving messages from Q.

Here is a simple message queue reader code in C++:

#include <iostream>
#include <mqueue.h>
#include <string.h>
#include <sstream>
#include <unistd.h>
#include <errno.h>

using namespace std;

#define MSG_Q_NAME "/MY_MSGQ_3"

static void                     /* Thread start function */
tfunc(union sigval sv)
{
  mqd_t msq_id = *(static_cast<mqd_t*>(sv.sival_ptr));

  struct mq_attr attr;
  if(mq_getattr(msq_id, &attr) < 0)
  {
    cout << "Error in mq_getattr " << strerror(errno)  << endl;
    return;
  }

  // Reregister for new messages on Q
  struct sigevent sev;
  sev.sigev_notify = SIGEV_THREAD;
  sev.sigev_notify_function = tfunc;
  sev.sigev_notify_attributes = NULL;
  sev.sigev_value.sival_ptr = sv.sival_ptr;
  if (mq_notify(msq_id, &sev) < 0)
  {
    cout << "Error during Reregister in msq_notify : "
         << strerror(errno) << endl;
    exit(EXIT_FAILURE);
  }

  // Read new message on the Q
  char* arr = new char[attr.mq_msgsize];
  memset(arr, 0, attr.mq_msgsize);
  if(mq_receive(msq_id, arr, attr.mq_msgsize, 0) < 0)
  {
    if(errno != EAGAIN)
    {
      cout << "Error in mq_receive " << strerror(errno) << endl;
      exit(EXIT_FAILURE);
    }
  }
  else
  {
    cout << "Msg rcvd " << arr << endl;
  }
}

int main()
{
  mqd_t msq_id = mq_open(MSG_Q_NAME, O_RDONLY | O_NONBLOCK | O_CREAT, 0666, 0);
  if(msq_id == (mqd_t) -1)
  {
    cout << "Error on msg Q creation: " << strerror(errno) << endl;
    exit(EXIT_FAILURE);
  }

  // The process is registered for notification for new message on the Q
  struct sigevent sev;
  sev.sigev_notify = SIGEV_THREAD;
  sev.sigev_notify_function = tfunc;
  sev.sigev_notify_attributes = NULL;
  sev.sigev_value.sival_ptr = &msq_id;

  if (mq_notify(msq_id, &sev) < 0)
  {
    cout << "Error on msg Q notify : " << strerror(errno) << endl;
    exit(EXIT_FAILURE);
  }
  else
  {
    cout << "Notify for msg Q reception " << MSG_Q_NAME << endl;
  }

  // Man Page mq_notify: Message notification occurs only when a new
  // message arrives and the queue was previously empty. If the queue was
  // not empty at the time mq_notify() was called, then a notification will
  // occur only after the queue is emptied and a new message arrives.
  //
  // So emptying the Q to recv new messages
  ssize_t n = 0;
  struct mq_attr attr;
  if(mq_getattr(msq_id, &attr) < 0)
  {
    cout << "Error in mq_getattr " << strerror(errno)  << endl;
    exit(EXIT_FAILURE);
  }
  char* arr = new char[attr.mq_msgsize];
  memset(arr, 0, attr.mq_msgsize);
  while((n = mq_receive(msq_id, arr, attr.mq_msgsize, 0) >= 0))
  {
    cout << "Empty the Q. Msg rcvd " << arr << endl;
  }

  while(1)
    ;

  mq_close(msq_id);
}

Here is a simple message Q writer code:

#include <iostream>
#include <mqueue.h>
#include <string.h>
#include <sstream>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h> 

using namespace std;

#define MSG_Q_NAME "/MY_MSGQ_3"

int main()
{
  struct mq_attr attr;
  memset(&attr, 0, sizeof attr);
  attr.mq_msgsize = 8192;
  attr.mq_flags = 0;
  attr.mq_maxmsg = 10;

  mqd_t msq_id = mq_open(MSG_Q_NAME, O_RDWR | O_CREAT | O_NONBLOCK,
                         0777, &attr);
  if(msq_id == (mqd_t) -1)
  {
    cout << "Error on msg Q creation: " << strerror(errno) << endl;
    exit(1);
  }

  // Write 5 msgs on message Q
  for(int i = 0; i < 5; ++i)
  {
    stringstream s;
    s << "My Msg " << i;

    if(mq_send(msq_id, s.str().c_str(), strlen(s.str().c_str()), 0) < 0)
    {
      if(errno != EAGAIN)
      {
        cout << "Error on sending msg on MsgQ " << strerror(errno);
        mq_close(msq_id);
        exit(1);
      }
    }
    else
    {
      cout << "Sent msg " << s.str() << endl;
    }

    sleep(1); // Easily see the received message in reader
  }

  mq_close(msq_id);
}
错々过的事 2024-10-13 02:36:50

因为代码是手册页的副本,所以我会问:

  1. 您确定正确地将消息发送到队列吗?
  2. 首先尝试使用阻塞读取来读取,看看是否能得到一些东西。

在Linux下,您可以使用select/poll/epoll来等待有关队列的通知以及
mqd_t 是普通文件描述符。

Because the code is copies from man page I would ask:

  1. Are you sure you sending message to queue correctly?
  2. Try first to read using blocking read and see if you get something.

Under Linux you can use select/poll/epoll to wait for notifications about queue as well as
mqd_t is ordinary file descriptor.

篱下浅笙歌 2024-10-13 02:36:50

还要检查队列是否已经有消息。如果队列中存在消息,则 mq_notify() 将不会收到通知,直到队列为空,然后有新消息进入。
由于队列在程序运行中是持久的,因此您需要确保在打开之前已在队列上调用 mq_unlink() 。

Also check if the Queue already has messages. If messages exist on the queue then mq_notify() would not get notification untill the queue is empty and then new messages come in.
Since the queue is a persistent across program runs you want to make sure the mq_unlink() is already called on the queue before open.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文