MQTT客户端在发布消息时无限期地等待
我尝试使用 paho 库实现异步 MQTT 客户端,该客户端接收有关主题“请求”的消息,制定字符串并将响应放在主题“响应”上。我使用回调来处理传入的消息。
#include "mqtt/async_client.h"
#include "mqtt/topic.h"
const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};
class TestCallback : public virtual mqtt::callback
{
// the mqtt client
mqtt::async_client& cli_;
// (re)connection success
void connected(const std::string& cause) override
{
cli_.subscribe("request", 0);
}
// callback for when a message arrives.
void message_arrived(mqtt::const_message_ptr msg) override
{
if( msg->get_topic() == "request" )
{
/* format response message here and put it into (string) msg */
mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
pubmsg->set_qos(2);
//// PROBLEMATIC CODE ////
cli_.publish(pubmsg)->wait();
//////////////////////////
}
}
public:
TestCallback(mqtt::async_client& cli)
: cli_(cli) {}
};
int main(int argc, char** argv)
{
mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
TestCallback cb(cli);
cli.set_callback(cb);
mqtt::connect_options connOpts = mqtt::connect_options_builder()
.clean_session(false)
.automatic_reconnect()
.finalize();
try
{
cli.connect(connOpts)->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
// run until the application is shut down
while (std::tolower(std::cin.get()) != 'q')
;
try
{
cli.disconnect()->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
return 0;
}
当我尝试发布响应消息时,问题就出现了,因为客户端似乎无限期地等待。负责此操作的是 wait
函数,该函数用于令牌来跟踪已发布消息的状态 (参考)。据我了解,必须这样做,尤其是在使用更高级别的 QoS 时,以确保一切顺利。
删除对 wait()
的调用后,它会按预期工作。但我不确定这是否能保证消息的正确发布。
这样做的正确方法是什么?
I try to implement an asynchronous MQTT client with the paho library, that receives messages on topic "request", formulates a string and puts the response out on topic "response". I use the callbacks to handle the incoming messages.
#include "mqtt/async_client.h"
#include "mqtt/topic.h"
const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};
class TestCallback : public virtual mqtt::callback
{
// the mqtt client
mqtt::async_client& cli_;
// (re)connection success
void connected(const std::string& cause) override
{
cli_.subscribe("request", 0);
}
// callback for when a message arrives.
void message_arrived(mqtt::const_message_ptr msg) override
{
if( msg->get_topic() == "request" )
{
/* format response message here and put it into (string) msg */
mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
pubmsg->set_qos(2);
//// PROBLEMATIC CODE ////
cli_.publish(pubmsg)->wait();
//////////////////////////
}
}
public:
TestCallback(mqtt::async_client& cli)
: cli_(cli) {}
};
int main(int argc, char** argv)
{
mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
TestCallback cb(cli);
cli.set_callback(cb);
mqtt::connect_options connOpts = mqtt::connect_options_builder()
.clean_session(false)
.automatic_reconnect()
.finalize();
try
{
cli.connect(connOpts)->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
// run until the application is shut down
while (std::tolower(std::cin.get()) != 'q')
;
try
{
cli.disconnect()->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
return 0;
}
The problem arises when I try to publish the response message, as the client seems to wait indefinitely. Responsible for this is the wait
function which is used on a token to track the status of the published message (reference). To my understanding, this has to be done especially when using higher levels of QoS so ensure everything went well.
Upon removal of the call to wait()
, it works as expected. But I am not sure if this ensures the correct publishing of messages.
What is the correct way to do this?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
是的,@hardillb 是对的:问题是您无法从回调中对库进行阻塞调用。而
wait()
是一个阻塞调用,因此它会导致回调线程死锁。有一个线程处理来自 MQTT 连接的传入数据包,该线程用于调用回调。当您在 QoS 1 发布上调用
wait()
时,它会阻止输入处理,因此无法处理 PUBACK 来完成等待。如果您要使用回调,则需要“全力以赴”并使用额外的回调来指示发布的成功/失败完成。
老实说,我从来都不是回调驱动的异步 I/O 的忠实粉丝。它很混乱,并且给应用程序带来了线程同步的沉重负担。但 C++ 库的最初目标是使其与早期的 IBM Java 库相似。
我更喜欢 future/promise(异步/等待)风格。我认为如果/当该库有一个改进的 v2.0 时,它就会实现这种风格。
Yes, @hardillb is right: the problem is that you can not make a blocking call to the library from within a callback. And
wait()
is a blocking call, so it deadlocks the callback thread.There's a single thread processing the incoming packets from the MQTT connection, and that thread is used to invoke the callbacks. When you call
wait()
on a QoS 1 publish, it blocks the input processing, so it can't process the PUBACK to complete the wait.If you're going to use callbacks, you sort of need to go "all in" on them and use an additional callback to indicate success/failure completion of the publish.
Honestly, I was never a big fan of callback-driven asynchronous I/O; it's confusing and puts a heavy burden of thread synchronization on the app. But the initial goal of the C++ lib was to make it similar to the earlier IBM Java library.
I much prefer the future/promise (async/await) style. I think if/when there's a revamped v2.0 of the library it'll just implement that style.
我将在这里猜测,因为我真的不知道异步在C ++中的工作方式。
MQTT客户端具有一个单个消息处理线程,这处理了所有传入和发出的TCP数据包在插座上/出发时它们。当新的MQTT消息到达时,它将调用消息处理程序回调(
Message_arrived
),在该回调中,您调用发布
和wait
以完成。但是,由于调用等待
有效地阻止message_arrived
消息处理线程无法继续。这意味着它无法处理发布
完成所需的3腿QoS2握手,因此可以悬挂。我还会猜测,如果将发布更改为QoS 0,它将完成,但也将失败,因为这需要消息处理线程发送/接收多个消息以继续。
不等待发布可能是正确的解决方案。
I'm going to make a guess here, because I don't really know how async works in C++.
The MQTT client has a single message handling thread, this deals with all the incoming and outgoing TCP packets as they arrive/depart on the socket. When a new MQTT message arrives it then calls the message handler callback (
message_arrived
), in which you callpublish
andwait
for it to complete. But because the call towait
effectively blocksmessage_arrived
the message handling thread can not continue. This means it can not deal with the 3 legged QOS2 handshake required for thepublish
to complete, hence it hangs.I will also guess that if you changed the publish to QOS 0 it would complete, but would also fail with QOS 1 as that requires the message handling thread to send/receive multiple messages to continue.
Not waiting for the publish to complete is probably the correct solution.