从多个队列读取,RabbitMQ

发布于 2024-11-19 20:23:40 字数 9480 浏览 6 评论 0原文

我是 RabbitMQ 的新手。当有多个队列(要读取)时,我希望能够在不阻塞的情况下处理读取消息。有关我如何做到这一点的任何意见?

//编辑1

public class Rabbit : IMessageBus
{   

    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    Subscription sub = null;

    public void writeMessage( Measurement m1 ) {
        byte[] body = Measurement.AltSerialize( m1 );
        int msgCount = 1;
        Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);

        string finalQueue = publishToQueue( m1.id );

        while (msgCount --> 0) {
            channel.BasicPublish("amq.direct", finalQueue, null, body);
        }

        Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
    }

     public string publishToQueue(string firstQueueName) {
        Console.WriteLine("Creating a queue and binding it to amq.direct");
        string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
        channel.QueueBind(queueName, "amq.direct", queueName, null);
        Console.WriteLine("Done.  Created queue {0} and bound it to amq.direct.\n", queueName);
        return queueName;
    }


    public Measurement readMessage() {
        Console.WriteLine("Receiving message...");
        Measurement m = new Measurement();

        int i = 0;
        foreach (BasicDeliverEventArgs ev in sub) {
            m = Measurement.AltDeSerialize(ev.Body);
            //m.id = //get the id here, from sub
            if (++i == 1)
                break;
            sub.Ack();
        }

        Console.WriteLine("Done.\n");
        return m;
    }


    public void subscribeToQueue(string queueName ) 
    {
        sub = new Subscription(channel, queueName);
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public Rabbit(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        //consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    ~Rabbit()
    {
        //observer??
        connection.Dispose();
        //channel.Dispose();
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//编辑2

private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            foreach (BasicDeliverEventArgs ev in element) {
                //ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    return m;
                }
                m =  null;  
            }           
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub);     
    }

//编辑3

//MessageHandler.cs

public class MessageHandler
{   
    // Implementation of methods for Rabbit class go here
    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    QueueingBasicConsumer consumer = null;  

    private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

    public void writeMessage ( Measurement m1 )
    {
        byte[] body = Measurement.AltSerialize( m1 );
        //declare a queue if it doesn't exist
        publishToQueue(m1.id);

        channel.BasicPublish("amq.direct", m1.id, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", m1.id);
    }

    public void publishToQueue(string queueName)
    {   
        string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
        channel.QueueBind(finalQueueName, "amq.direct", "", null);
    }

    public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            if( element.QueueName == null)
            {
                m = null;
            }
            else 
            {
                BasicDeliverEventArgs ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    m.id = element.QueueName;
                    element.Ack();
                    return m;
                }
                m =  null;                      
            }
            element.Ack();
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub); 
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public MessageHandler(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    public void disposeAll()
    {
        connection.Dispose();
        channel.Dispose();
        foreach(Subscription element in subscriptions)
        {
            element.Close();
        }
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//App1.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;


public class MainClass
{
    public static void Main()
    {

    MessageHandler obj1 = MessageHandler("Rabbit");

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create new Measurement messages
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33);

    System.Console.WriteLine("Test message 1:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    System.Console.WriteLine("Test message 2:\n    ID: {0}", m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);   

    // Ask queue name and store it
    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName );

    // Write message to the queue
    obj1.writeMessage( m1 );    

    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName2 = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName2 );

    obj1.writeMessage( m2 );

    obj1.disposeAll();
}
}

//App2.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create a new Measurement object m
    Measurement m = new Measurement();  

    System.Console.WriteLine("Queue name to subscribe to: ");
    string QueueName1 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName1 );

    //Read message into m
    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    System.Console.WriteLine("Another queue name to subscribe to: ");
    string QueueName2 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName2 );

    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    obj1.disposeAll();
}
}

I am new to RabbitMQ. I want to be able to handle reading messages without blocking when there are multiple queues (to read from). Any inputs on how I can do that?

//Edit 1

public class Rabbit : IMessageBus
{   

    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    Subscription sub = null;

    public void writeMessage( Measurement m1 ) {
        byte[] body = Measurement.AltSerialize( m1 );
        int msgCount = 1;
        Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);

        string finalQueue = publishToQueue( m1.id );

        while (msgCount --> 0) {
            channel.BasicPublish("amq.direct", finalQueue, null, body);
        }

        Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
    }

     public string publishToQueue(string firstQueueName) {
        Console.WriteLine("Creating a queue and binding it to amq.direct");
        string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
        channel.QueueBind(queueName, "amq.direct", queueName, null);
        Console.WriteLine("Done.  Created queue {0} and bound it to amq.direct.\n", queueName);
        return queueName;
    }


    public Measurement readMessage() {
        Console.WriteLine("Receiving message...");
        Measurement m = new Measurement();

        int i = 0;
        foreach (BasicDeliverEventArgs ev in sub) {
            m = Measurement.AltDeSerialize(ev.Body);
            //m.id = //get the id here, from sub
            if (++i == 1)
                break;
            sub.Ack();
        }

        Console.WriteLine("Done.\n");
        return m;
    }


    public void subscribeToQueue(string queueName ) 
    {
        sub = new Subscription(channel, queueName);
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public Rabbit(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        //consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    ~Rabbit()
    {
        //observer??
        connection.Dispose();
        //channel.Dispose();
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//Edit 2

private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            foreach (BasicDeliverEventArgs ev in element) {
                //ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    return m;
                }
                m =  null;  
            }           
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub);     
    }

//Edit 3

//MessageHandler.cs

public class MessageHandler
{   
    // Implementation of methods for Rabbit class go here
    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    QueueingBasicConsumer consumer = null;  

    private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

    public void writeMessage ( Measurement m1 )
    {
        byte[] body = Measurement.AltSerialize( m1 );
        //declare a queue if it doesn't exist
        publishToQueue(m1.id);

        channel.BasicPublish("amq.direct", m1.id, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", m1.id);
    }

    public void publishToQueue(string queueName)
    {   
        string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
        channel.QueueBind(finalQueueName, "amq.direct", "", null);
    }

    public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            if( element.QueueName == null)
            {
                m = null;
            }
            else 
            {
                BasicDeliverEventArgs ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    m.id = element.QueueName;
                    element.Ack();
                    return m;
                }
                m =  null;                      
            }
            element.Ack();
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub); 
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public MessageHandler(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    public void disposeAll()
    {
        connection.Dispose();
        channel.Dispose();
        foreach(Subscription element in subscriptions)
        {
            element.Close();
        }
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//App1.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;


public class MainClass
{
    public static void Main()
    {

    MessageHandler obj1 = MessageHandler("Rabbit");

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create new Measurement messages
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33);

    System.Console.WriteLine("Test message 1:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    System.Console.WriteLine("Test message 2:\n    ID: {0}", m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);   

    // Ask queue name and store it
    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName );

    // Write message to the queue
    obj1.writeMessage( m1 );    

    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName2 = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName2 );

    obj1.writeMessage( m2 );

    obj1.disposeAll();
}
}

//App2.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create a new Measurement object m
    Measurement m = new Measurement();  

    System.Console.WriteLine("Queue name to subscribe to: ");
    string QueueName1 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName1 );

    //Read message into m
    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    System.Console.WriteLine("Another queue name to subscribe to: ");
    string QueueName2 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName2 );

    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    obj1.disposeAll();
}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

别闹i 2024-11-26 20:23:40

两个信息来源:

  1. http://lists.rabbitmq.com /cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. 你应该首先尝试理解这些示例。

    • %Program Files%\RabbitMQ\DotNetClient\examples\src(基本示例)

    • 从 Mercurial 存储库(C# 项目)获取完整的工作示例。

需要理解的有用操作:

  • 声明/断言/监听/订阅/发布

回复:
你的问题——没有理由不能拥有多个听众。或者,您可以通过“交换”上的一个侦听器订阅 n 个路由路径。

** 回复:非阻塞 **

典型的侦听器一次消费一条消息。您可以将它们从队列中拉出,或者它们将以“窗口”方式自动放置在靠近消费者的位置(通过服务质量 qos 参数定义)。这种方法的美妙之处在于,很多艰苦的工作已经为您完成(例如:可靠性、有保证的交付等)。

RabbitMQ 的一个关键功能是,如果处理过程中出现错误,则消息将重新添加回队列(容错功能)。

需要进一步了解您的情况。

通常,如果您发布到我上面提到的列表,您就可以找到 RabbitMQ 的工作人员。他们非常有帮助。

希望能有所帮助。一开始需要花很多心思,但值得坚持下去。


问答

参见:http://www.rabbitmq.com/faq.html

问:可以使用 new Subscription(channel,queueName) 订阅多个队列吗?

是的。您可以使用绑定密钥,例如 abc.*.hij 或 abc.#.hij,也可以附加多个绑定。前者假设您已经围绕某种对您有意义的原则设计了路由键(请参阅常见问题解答中的路由键)。对于后者,您需要绑定到多个队列。

手动实现 n 绑定。
请参阅:http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

没有太多此模式背后的代码,因此如果通配符不够,您可以推出自己的订阅模式。您可以从此类继承并添加另一个方法来进行额外的绑定...可能这会起作用或接近于此(未经测试)。

AQMP 规范表示可以进行多个手动绑定: http: //www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

问。如果是这样,我如何遍历所有订阅的队列并返回消息(没有消息时返回 null)?

对于订阅者,当消息可用时您会收到通知。否则,您所描述的是一个拉取界面,您可以根据请求拉取消息。如果没有可用的消息,您将根据需要得到一个空值。顺便说一句:Notify 方法可能更方便。

问。哦,请注意,我以不同的方法进行所有这些操作。我将编辑我的帖子以反映代码

实时代码:

此版本必须使用通配符来订阅多个路由键

n 使用订阅的手动路由键留给读者作为练习。 ;-) 我认为无论如何你都倾向于拉动界面。顺便说一句:拉动接口的效率低于通知接口。

        using (Subscription sub = new Subscription(ch, QueueNme))
        {
            foreach (BasicDeliverEventArgs ev in sub)
            {
                Process(ev.Body);

        ...

注意:foreach使用IEnumerable,IEnumerable通过“yield”语句包装新消息到达的事件。实际上它是一个无限循环。

--- 更新

AMQP 的设计理念是保持 TCP 连接数量与应用程序数量一样低,这意味着每个连接可以拥有多个通道。

这个问题(编辑3)中的代码尝试使用两个订阅者和一个频道,而它应该(我相信)是每个线程每个频道一个订阅者,以避免锁定问题。建议:使用路由键“通配符”。可以使用 java 客户端订阅多个不同的队列名称,但据我所知,.net 客户端并未在订阅者帮助程序类中实现此功能。

如果您确实需要在同一订阅线程上使用两个不同的队列名称,则建议 .net 使用以下拉取序列:

        using (IModel ch = conn.CreateModel()) {    // btw: no reason to close the channel afterwards IMO
            conn.AutoClose = true;                  // no reason to closs the connection either.  Here for completeness.

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:");
            }

            return 0;
        }

-- UPDATE 2:

来自 RabbitMQ 列表:

“假设 element.Next() 在其中一个订阅上阻塞。
您可以从每个订阅中检索交付,并设置超时时间
读过去。或者,您可以设置一个队列来接收
所有测量并通过单个订阅从中检索消息。”(Emile)

这意味着当第一个队列为空时,.Next() 会阻塞等待下一条消息出现。即订阅者有一个等待-内置下一条消息。

-- 更新 3:

在 .net 下,使用 QueueingBasicConsumer 从多个队列进行消费。

实际上,这里有一个关于它的线程,以了解用法:

等待单个 RabbitMQ 消息并超时

-- UPDATE4 :

有关 .QueueingBasicConsumer 的更多信息

这里有示例代码

。 href="http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html ” rel="noreferrer">http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client。 QueueingBasicConsumer.html

示例经过一些修改复制到答案中(请参阅 //<-----)。

                IModel channel = ...;
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, false, null, consumer);  //<-----
            channel.BasicConsume(queueName2, false, null, consumer); //<-----
            // etc. channel.BasicConsume(queueNameN, false, null, consumer);  //<-----

            // At this point, messages will be being asynchronously delivered,
            // and will be queueing up in consumer.Queue.

            while (true) {
                try {
                    BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                    // ... handle the delivery ...
                    channel.BasicAck(e.DeliveryTag, false);
                } catch (EndOfStreamException ex) {
                    // The consumer was cancelled, the model closed, or the
                    // connection went away.
                    break;
                }
            }

-- 更新 5:一个可以作用于任何队列的简单 get(较慢,但有时更方便的方法)。

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:"); 
                // deserialize body and display extra info here.
            }

two sources of info:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. You should really try to understand the examples first.

    • %Program Files%\RabbitMQ\DotNetClient\examples\src (basic examples)

    • get full working examples from their Mercurial repository (c# projects).

Useful operations to understand:

  • Declare / Assert / Listen / Subscribe / Publish

Re:
your question -- there's no reason why you can't have multiple listenners. Or you could subscribe to n routing paths with one listenner on an "exchange".

** re: non-blocking **

A typical listenner consumes messages one at a time. You can pull them off the queue, or they will automatically be placed close to the consumer in a 'windowed' fashion (defined through quality of service qos parameters). The beauty of the approach is that a lot of hard work is done for you (re: reliability, guaranteed delivery, etc.).

A key feature of RabbitMQ, is that if there is an error in processing, then the message is re-added back into the queue (a fault tolerance feature).

Need to know more about you situation.

Often if you post to the list I mentioned above, you can get hold of someone on staff at RabbitMQ. They're very helpful.

Hope that helps a little. It's a lot to get your head around at first, but it is worth persisting with.


Q&A

see: http://www.rabbitmq.com/faq.html

Q. Can you subscribe to multiple queues using new Subscription(channel, queueName) ?

Yes. You either use a binding key e.g. abc.*.hij, or abc.#.hij, or you attach multiple bindings. The former assumes that you have designed your routing keys around some kind of principle that makes sense for you (see routing keys in the FAQ). For the latter, you need to bind to more than one queue.

Implementing n-bindings manually.
see: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

there's not much code behind this pattern, so you could roll your own subscription pattern if wildcards are not enough. you could inherit from this class and add another method for additional bindings... probably this will work or something close to this (untested).

The AQMP spec says that multiple manual binding are possible: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

Q. And if so, how can I go through all the subscribed queues and return a message (null when no messages)?

With a subscriber you are notified when a message is available. Otherwise what you are describing is a pull interface where you pull the message down on request. If no messages available, you'll get a null as you'd like. btw: the Notify method is probably more convenient.

Q. Oh, and mind you that that I have all this operations in different methods. I will edit my post to reflect the code

Live code:

this version must use wild cards to subscribe to more than one routing key

n manual routing keys using subscription is left as an exercise for the reader. ;-) I think you were leaning towards a pull interface anyway. btw: pull interfaces are less efficient than notify ones.

        using (Subscription sub = new Subscription(ch, QueueNme))
        {
            foreach (BasicDeliverEventArgs ev in sub)
            {
                Process(ev.Body);

        ...

Note: the foreach uses IEnumerable, and IEnumerable wraps the event that a new message has arrived through the "yield" statement. Effectively it is an infinite loop.

--- UPDATE

AMQP was designed with the idea of keeping the number of TCP connections as low as the number of applications, so that means you can have many channels per connection.

the code in this question (edit 3) tries to use two subscribers with one channel, whereas it should (I believe), be one subscriber per channel per thread to avoid locking issues. Sugestion: use a routing key "wildcard". It is possible to subscribe to more than one distinct queue names with the java client, but the .net client does not to my knowledge have this implemented in the Subscriber helper class.

If you really do need two distinct queue names on the same subscription thread, then the following pull sequence is suggested for .net:

        using (IModel ch = conn.CreateModel()) {    // btw: no reason to close the channel afterwards IMO
            conn.AutoClose = true;                  // no reason to closs the connection either.  Here for completeness.

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:");
            }

            return 0;
        }

-- UPDATE 2:

from RabbitMQ list:

"assume that element.Next() is blocking on one of the subscriptions.
You could retrieve deliveries from each subscription with a timeout to
read past it. Alternatively you could set up a single queue to receive
all measurements and retrieve messages from it with a single subscription." (Emile)

What that means is that when the first queue is empty, .Next() blocks waiting for the next message to appear. i.e. the subscriber has a wait-for-next-message built in.

-- UPDATE 3:

under .net, use the QueueingBasicConsumer for consumption from multiple queues.

Actually here's a thread about it to get a feel on usage:

Wait for a single RabbitMQ message with a timeout

-- UPDATE4:

some more info on the .QueueingBasicConsumer

There's example code here.

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

example copied into the answer with a few modifications (see //<-----).

                IModel channel = ...;
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, false, null, consumer);  //<-----
            channel.BasicConsume(queueName2, false, null, consumer); //<-----
            // etc. channel.BasicConsume(queueNameN, false, null, consumer);  //<-----

            // At this point, messages will be being asynchronously delivered,
            // and will be queueing up in consumer.Queue.

            while (true) {
                try {
                    BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                    // ... handle the delivery ...
                    channel.BasicAck(e.DeliveryTag, false);
                } catch (EndOfStreamException ex) {
                    // The consumer was cancelled, the model closed, or the
                    // connection went away.
                    break;
                }
            }

-- UPDATE 5 : a simple get that will act on any queue (a slower, but sometimes more convenient method).

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:"); 
                // deserialize body and display extra info here.
            }
荆棘i 2024-11-26 20:23:40

最简单的方法是使用 EventingBasicConsumer。我的网站上有一个关于如何使用它的示例。 RabbitMQ EventingBasicConsumer

这个 Consumer 类公开了一个您可以使用的 Received Event,因此不阻止。其余代码基本保持不变。

The easiest way is to use the EventingBasicConsumer. I have an example on my site on how to use it. RabbitMQ EventingBasicConsumer

This Consumer class exposes a Received Event you can use, and therefore does NOT block. The rest of the code basically stays the same.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文