如何使用 Net::Stomp 和带有 receive_frame 的事务
我正在使用 Net::Stomp 调整一些现有代码,从能够处理单个主题到能够处理多个主题。谁能告诉我这种方法是否可行?它现在不起作用,因为它在期望交易收据的地方收到了关于另一个主题的第一条消息。在我开始尝试修复它之前,我想知道我是否只是在吠叫错误的树。
工作流程如下所示:
# first subscribe to three different queues
for $job (qw/ JOB1 JOB2 JOB3 /){
$stomp->subscribe({
"ack" => "client",
"destination" => "/queue/$job"
});
# listen on those three channels...
while($stomp->can_read){
$frame = $stomp->receive_frame();
# ... receives a message for JOB1
# and to start a transaction send a BEGIN frame that looks like this:
bless({
command => "BEGIN",
headers => {
receipt => "0002",
transaction => "0001",
},
}, "Net::Stomp::Frame")
# Then looks for a receipt on that frame by calling
$receipt = $stomp->receive_frame()
不幸的是,在它期望收到 RECEIPT 帧的地方,它实际上得到了在 JOB2 队列中等待的下一个 MESSAGE 帧。
我的问题是,有什么办法可以实现这一点,既可以订阅多个主题,又可以接收交易收据吗?或者有更好/更标准的方法来处理它吗?
任何提示或建议将非常受欢迎,谢谢!我也在 ActiveMQ 列表上交叉发布这个问题,希望没问题:-/
* update *
这是一个完整的重现案例:
use Net::Stomp;
use strict;
my $stomp = Net::Stomp->new( { hostname => 'bpdeb', port => '61612' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );
# pre-populate the two queues
$stomp->send( { destination => '/queue/FOO.BAR', body => 'test message' } );
$stomp->send( { destination => '/queue/FOO.BAR2', body => 'test message' } );
# now subscribe to them
$stomp->subscribe({ destination => '/queue/FOO.BAR',
'ack' => 'client',
'activemq.prefetchSize' => 1
});
$stomp->subscribe({ destination => '/queue/FOO.BAR2',
'ack' => 'client',
'activemq.prefetchSize' => 1
});
# read one frame, then start a transaction asking for a receipt of the
# BEGIN message
while ($stomp->can_read()){
my $frame = $stomp->receive_frame;
print STDERR "got frame ".$frame->as_string()."\n";
print STDERR "sending a BEGIN\n";
my($frame) = Net::Stomp::Frame->new({
command => 'BEGIN',
headers => {
transaction => 123,
receipt => 456,
},
});
$stomp->send_frame($frame);
my $expected_receipt = $stomp->receive_frame;
print STDERR "expected RECEIPT but got ".$expected_receipt->as_string()."\n";
exit;
}
此输出(省略了详细信息)
got frame MESSAGE
destination:/queue/FOO.BAR
....
sending a BEGIN
expected RECEIPT but got MESSAGE
destination:/queue/FOO.BAR2
....
查看网络流量,一旦发送 SUBSCRIBE 请求,队列中的第一条消息就会通过线路发送到客户端。因此,当我发送 BEGIN 消息时,来自 FOO.BAR2 的第一条消息已经在客户端的网络缓冲区中等待,并且客户端直接从其缓冲区读取 FOO.BAR2。
所以要么我做错了什么,要么它不能以这种方式工作。
I'm in the process of adapting some existing code using Net::Stomp from being able to handle a single topic to being able to work on multiple topics. Can anyone tell me if this approach is even possible? It's not working now because where it expects a transaction receipt, it's getting the first message on another topic. I'd like to know if I'm just barking up the wrong tree before I go about trying to fix it.
Here's what the workflow looks like:
# first subscribe to three different queues
for $job (qw/ JOB1 JOB2 JOB3 /){
$stomp->subscribe({
"ack" => "client",
"destination" => "/queue/$job"
});
# listen on those three channels...
while($stomp->can_read){
$frame = $stomp->receive_frame();
# ... receives a message for JOB1
# and to start a transaction send a BEGIN frame that looks like this:
bless({
command => "BEGIN",
headers => {
receipt => "0002",
transaction => "0001",
},
}, "Net::Stomp::Frame")
# Then looks for a receipt on that frame by calling
$receipt = $stomp->receive_frame()
Unfortunately, where it's expecting a RECEIPT frame, it actually gets the next MESSAGE frame that's waiting in the JOB2 queue.
My question is, is there any way for that to work, to be both subscribed to multiple topics and to be able to receive receipts on transactions? Or is there a better/more standard way to handle it?
Any tips or suggestions would be most welcome, thanks! I'm cross-posting this question on the ActiveMQ list too, hope that's ok :-/
* update *
Here's a complete repro case:
use Net::Stomp;
use strict;
my $stomp = Net::Stomp->new( { hostname => 'bpdeb', port => '61612' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );
# pre-populate the two queues
$stomp->send( { destination => '/queue/FOO.BAR', body => 'test message' } );
$stomp->send( { destination => '/queue/FOO.BAR2', body => 'test message' } );
# now subscribe to them
$stomp->subscribe({ destination => '/queue/FOO.BAR',
'ack' => 'client',
'activemq.prefetchSize' => 1
});
$stomp->subscribe({ destination => '/queue/FOO.BAR2',
'ack' => 'client',
'activemq.prefetchSize' => 1
});
# read one frame, then start a transaction asking for a receipt of the
# BEGIN message
while ($stomp->can_read()){
my $frame = $stomp->receive_frame;
print STDERR "got frame ".$frame->as_string()."\n";
print STDERR "sending a BEGIN\n";
my($frame) = Net::Stomp::Frame->new({
command => 'BEGIN',
headers => {
transaction => 123,
receipt => 456,
},
});
$stomp->send_frame($frame);
my $expected_receipt = $stomp->receive_frame;
print STDERR "expected RECEIPT but got ".$expected_receipt->as_string()."\n";
exit;
}
This outputs (with the details elided)
got frame MESSAGE
destination:/queue/FOO.BAR
....
sending a BEGIN
expected RECEIPT but got MESSAGE
destination:/queue/FOO.BAR2
....
Looking at the network traffic, as soon as a SUBSCRIBE request is sent, the first message in the queue goes over the wire to the client. So the first message from FOO.BAR2 is already waiting in the client's network buffer when I send the BEGIN message, and the client reads the FOO.BAR2 straight from its buffer.
So either I'm doing something wrong, or it can't work this way.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
好的,我尝试了一下,效果很好。但你是收到帧的人。那么服务器为什么要给你发送一个收据帧呢?
您正在设置
"ack" =>; “client”
,这意味着服务器将认为该帧“未交付”,除非您另有说明。只需将$receipt = $stomp->receive_frame()
行更改为$stomp->ack( { frame => $frame } );
即可。更新
啊,好吧,您想通过使用事务来保护
ack
。那么让我们看一下来源:有一个方法send_transactional
可能可以完成您想要做的事情(但它使用SEND
帧而不是ACK
)。也许您还应该查看从 cloudmark 提交的补丁 ,它为模块添加了几个“安全功能”(不幸的是,当我问模块作者时,他没有说任何关于合并该补丁的事情)。
Ok, I tried it and it works fine. But you are the one receiving a frame. So why should the server send you a receipt frame?
You're setting
"ack" => "client"
and that means, that the server will consider the frame as "not delivered" until you say otherwise. Just change the line$receipt = $stomp->receive_frame()
to$stomp->ack( { frame => $frame } );
.Update
Ah, ok you want to secure the
ack
by using a transaction. So let's have a look at the source: There is a methodsend_transactional
which does probably what you want to do (but it's using aSEND
frame instead ofACK
).Perhaps you should also have a look at the submitted patch from cloudmark, which adds several "security features" to the module (unfortunately the module author didn't say anything about merging that patch, when i asked him).