ZeroMQ 与 PHP 订阅者丢失消息

发布于 2024-12-26 18:07:11 字数 1902 浏览 5 评论 0原文

我使用 Zeromq 和 php 绑定来连接到 Freeswitch(一个 VOIP 软件交换机)中的 zmq 模块。

简而言之:我正在失去活动。 长的: Freeswitch 中的 zmq 模块是用 C++ 作为发布者实现的。 我的 PHP 代码如下:

<?php
  $context = new ZMQContext();

  echo "connect to freeswitch zmq module...";
  $sub = new ZMQSocket($context, ZMQ::SOCKET_SUB);
  $sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,"");
  $sub->connect("tcp://192.168.20.73:5556");
  $hwm = $sub->getSockOpt(ZMQ::SOCKOPT_HWM);
  echo "ok.hwm: $hwm\n";

  echo "looping\n";
  while(1) {
  $data = $sub->recv();
  $d = json_decode($sub->recv(),TRUE);
  $event = $d["Event-Name"];
  $date = $d["Event-Date-Local"];
  $ts = $d["Event-Date-Timestamp"];
  $msgnr = $d["ZMQ-Msg-Cnt"];
  echo "PHP: $date msg# $msgnr $ts received $event\n";
}
?>

ZMQ-Msg-Cnt 是我在 freeswitch 中构建到 zmq 模块中的序列号。 我可以看到每第二条消息都会丢失。 tcpdump显示该消息被zmq接收。

我已将 PHP 代码转换为 C 代码,现在我能够接收每条消息。 C:

#include "zhelpers.h"
#include "cJSON.h"

int main (void)
{
    void *context = zmq_init (1);

    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://192.168.20.73:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);

    while (1) {
        char *string = s_recv (subscriber);

        cJSON *root = cJSON_Parse(string);
        int msgcnt = cJSON_GetObjectItem(root,"ZMQ-Msg-Cnt")->valueint;

        printf("C: %s msg# %s %s received %s\n",
            cJSON_GetObjectItem(root,"Event-Date-Local")->valuestring,
            cJSON_GetObjectItem(root,"ZMQ-Msg-Cnt")->valuestring,
            cJSON_GetObjectItem(root,"Event-Date-Timestamp")->valuestring,
            cJSON_GetObjectItem(root,"Event-Name")->valuestring
        );
        cJSON_Delete(root);
        free (string);
    }

zmq_close (subscriber);
zmq_term (context);
return 0;
}

PHP 代码有什么问题吗? PHP 有什么技巧/必须做/提示吗?

提前致谢, 杰拉尔德·韦伯

i'm using zeromq with php bindings to connect to the zmq module in Freeswitch (a VOIP Software Switch).

Short: i'm loosing Events.
Long:
The zmq module in Freeswitch is implemented in c++ as publisher.
My PHP Code is as follows:

<?php
  $context = new ZMQContext();

  echo "connect to freeswitch zmq module...";
  $sub = new ZMQSocket($context, ZMQ::SOCKET_SUB);
  $sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,"");
  $sub->connect("tcp://192.168.20.73:5556");
  $hwm = $sub->getSockOpt(ZMQ::SOCKOPT_HWM);
  echo "ok.hwm: $hwm\n";

  echo "looping\n";
  while(1) {
  $data = $sub->recv();
  $d = json_decode($sub->recv(),TRUE);
  $event = $d["Event-Name"];
  $date = $d["Event-Date-Local"];
  $ts = $d["Event-Date-Timestamp"];
  $msgnr = $d["ZMQ-Msg-Cnt"];
  echo "PHP: $date msg# $msgnr $ts received $event\n";
}
?>

ZMQ-Msg-Cnt is a sequence number i've build into the zmq module in freeswitch.
I can see that every 2nd message is lost.
tcpdump shows that the message is received by zmq.

I've converted the PHP Code into C and now i'm able to receive every message.
C:

#include "zhelpers.h"
#include "cJSON.h"

int main (void)
{
    void *context = zmq_init (1);

    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://192.168.20.73:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);

    while (1) {
        char *string = s_recv (subscriber);

        cJSON *root = cJSON_Parse(string);
        int msgcnt = cJSON_GetObjectItem(root,"ZMQ-Msg-Cnt")->valueint;

        printf("C: %s msg# %s %s received %s\n",
            cJSON_GetObjectItem(root,"Event-Date-Local")->valuestring,
            cJSON_GetObjectItem(root,"ZMQ-Msg-Cnt")->valuestring,
            cJSON_GetObjectItem(root,"Event-Date-Timestamp")->valuestring,
            cJSON_GetObjectItem(root,"Event-Name")->valuestring
        );
        cJSON_Delete(root);
        free (string);
    }

zmq_close (subscriber);
zmq_term (context);
return 0;
}

Is there anything wrong with the PHP Code ?
Are there any tricks/must-do/hints for PHP ?

Thanks in advance,
gerald weber

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

娇柔作态 2025-01-02 18:07:11

您调用了 recv 函数两次,因此它正在加载一条消息,跳过对其的任何处理,然后加载第二条消息:

$data = $sub->recv();
// This is your first message, called in a blocking mode

$d = json_decode($sub->recv(),TRUE);
// and here's your second one, called in a non-blocking mode

将这两行更改为一行:

$d = json_decode($sub->recv());

You are calling the recv function twice, thus it is loading one message, skipping any processing on it, then loading the second one:

$data = $sub->recv();
// This is your first message, called in a blocking mode

$d = json_decode($sub->recv(),TRUE);
// and here's your second one, called in a non-blocking mode

Change these two lines to just one:

$d = json_decode($sub->recv());
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文