如何让SimpleRpcClient.Call()成为阻塞调用,实现与RabbitMQ的同步通信?

发布于 2024-11-06 10:44:17 字数 1373 浏览 7 评论 0原文

在 RabbitMQ 的 .NET 版本 (2.4.1) 中,RabbitMQ.Client.MessagePatterns.SimpleRpcClient 有一个具有以下签名的 Call() 方法:

    public virtual object[] Call(params object[] args);
    public virtual byte[] Call(byte[] body);
    public virtual byte[] Call(IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties);

问题:

经过各种尝试,该方法仍然继续不阻止我期望的情况,因此它无法处理响应。

问题:

我是否在SimpleRpcClient的设置中遗漏了一些明显的东西,或者之前的IModel,< em>IConnection,甚至PublicationAddress

更多信息

我也尝试了 QueueDeclare() 方法的各种参数配置,但没有成功。

string QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);

我的设置的更多参考代码:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection());
using (IModel ch = conn.CreateModel())
{
     var client = new SimpleRpcClient(ch, queueName);
     var queueName = ch.QueueDeclare("t.qid", true, true, true, null);

     ch.QueueBind(queueName, "exch", "", null);

     //HERE: does not block?
     var replyMessageBytes = client.Call(prop, msgToSend, out replyProp);
}

在其他地方寻找

或者我的“服务器端”代码是否可能存在问题?无论是否使用 BasicAck(),客户端似乎都已继续执行。

In the .NET version (2.4.1) of RabbitMQ the RabbitMQ.Client.MessagePatterns.SimpleRpcClient has a Call() method with these signatures:

    public virtual object[] Call(params object[] args);
    public virtual byte[] Call(byte[] body);
    public virtual byte[] Call(IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties);

The problem:

With various attempts, the method still continues to not block where I expect it to, so it's unable ever handle the response.

The Question:

Am I missing something obvious in the setup of the SimpleRpcClient, or earlier with the IModel, IConnection, or even PublicationAddress?

More Info:

I've also tried various paramater configurations of the QueueDeclare() method too with no luck.

string QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);

Some more reference code of my setup of these:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection());
using (IModel ch = conn.CreateModel())
{
     var client = new SimpleRpcClient(ch, queueName);
     var queueName = ch.QueueDeclare("t.qid", true, true, true, null);

     ch.QueueBind(queueName, "exch", "", null);

     //HERE: does not block?
     var replyMessageBytes = client.Call(prop, msgToSend, out replyProp);
}

Looking elsewhere:

Or is it likely there's an issue in my "server side" code? With and without the use of BasicAck() it appears the client has already continued execution.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

や莫失莫忘 2024-11-13 10:44:17

--简短回答--

你做错了”的一点......

检查IBasicProperties,您应该将SimpleRpcServerHandleSimpleCall()一起使用

如果您偶然发现这个问题,那么您要么采取了错误的方法就像我一样,并且可能犯了类似的错误,错误地操作了 IBasicProperties,从而损害了 SimpleRpcServer 正确运行的能力。

--长答案--

.NET 工作示例:
在 BitBucket 上找到我的工作示例:

https://bitbucket.org/NickJosevski/synchronous-rabbitmq-sample-。网

或者这是一个快速示例...

客户端:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
    ch.ExchangeDeclare(Helper.ExchangeName, "direct");

    var queueName = ch.EnsureQueue();

    var client = new SimpleRpcClient(ch, queueName);

    var msgToSend = new Message(/*data*/).Serialize();

    IBasicProperties replyProp;

    var reply = client.Call(new BasicProperties(), msgToSend, out replyProp);
}

服务器端:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
    ch.ExchangeDeclare(Helper.ExchangeName, "direct");
    var queuename = ch.EnsureQueue();

    var subscription = new Subscription(ch, queuename);

    new MySimpleRpcServerSubclass(subscription).MainLoop();
}

internal class MySimpleRpcServerSubclass : SimpleRpcServer
{
    public MySimpleRpcServerSubclass(Subscription subscription) 
        : base(subscription) { }

    public override byte[] HandleSimpleCall(
        bool isRedelivered, IBasicProperties requestProperties, 
        byte[] body, out IBasicProperties replyProperties)
    {
        replyProperties = requestProperties;
        replyProperties.MessageId = Guid.NewGuid().ToString();

        var m = Message.Deserialize(body);
        var r = 
            new Response
            {
                Message = String.Format("Got {0} with {1}", m.Name, m.Body)
            };

        return r.Serialize();
    }
}

共享:

//helper:
public static string EnsureQueue(this IModel ch)
{
    var queueName = ch.QueueDeclare(QueueId, false, false, false, null);

    ch.QueueBind(queueName, ExchangeName, "", null);

    return queueName;
}

//NOTE: 
not all extension methods are explained here, such as *.Serialize()* 
as they're not relevant and just make for a cleaner example.

--SHORT ANSWER--

Bit of the "You're doing it wrong"...

Check on IBasicProperties, and you should be using SimpleRpcServer with HandleSimpleCall()

If you stumble upon this question, you've either taken the wrong approach as I have, and possibly if made a similar mistake of manipulating IBasicProperties incorectly thereby hurting the ability of SimpleRpcServer to function correctly.

--LONG ANSWER--

Working Sample for .NET:
Find my working example up on BitBucket here:

https://bitbucket.org/NickJosevski/synchronous-rabbitmq-sample-.net

Or here's a quick sample...

Client side:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
    ch.ExchangeDeclare(Helper.ExchangeName, "direct");

    var queueName = ch.EnsureQueue();

    var client = new SimpleRpcClient(ch, queueName);

    var msgToSend = new Message(/*data*/).Serialize();

    IBasicProperties replyProp;

    var reply = client.Call(new BasicProperties(), msgToSend, out replyProp);
}

Server side:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
    ch.ExchangeDeclare(Helper.ExchangeName, "direct");
    var queuename = ch.EnsureQueue();

    var subscription = new Subscription(ch, queuename);

    new MySimpleRpcServerSubclass(subscription).MainLoop();
}

internal class MySimpleRpcServerSubclass : SimpleRpcServer
{
    public MySimpleRpcServerSubclass(Subscription subscription) 
        : base(subscription) { }

    public override byte[] HandleSimpleCall(
        bool isRedelivered, IBasicProperties requestProperties, 
        byte[] body, out IBasicProperties replyProperties)
    {
        replyProperties = requestProperties;
        replyProperties.MessageId = Guid.NewGuid().ToString();

        var m = Message.Deserialize(body);
        var r = 
            new Response
            {
                Message = String.Format("Got {0} with {1}", m.Name, m.Body)
            };

        return r.Serialize();
    }
}

Shared:

//helper:
public static string EnsureQueue(this IModel ch)
{
    var queueName = ch.QueueDeclare(QueueId, false, false, false, null);

    ch.QueueBind(queueName, ExchangeName, "", null);

    return queueName;
}

//NOTE: 
not all extension methods are explained here, such as *.Serialize()* 
as they're not relevant and just make for a cleaner example.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文