RabbitMQ 通道创建指南

发布于 2024-11-02 00:08:39 字数 412 浏览 6 评论 0 原文

我正在编写一个简单的类,我的应用程序将使用它来使用 RabbitMQ 发送和接收消息。 我已经阅读了尽可能多的有关 RabbitMQ 的操作方法、博客文章、白皮书和类似内容。 大多数示例都将连接和通道包装在 using 块中,并反驳说您应该将它们实现为单例。 具体来说,关于通道,我看到评论说您不应该有多个线程同时使用单个通道。

我正在用 C# 编写我的库。它是一个在第一次实例化时具有静态连接的单例。

我考虑过对频道做同样的事情,但我打算使用相同的库来允许发布/订阅多个交换/队列。发布和订阅都可以通过多个线程完成。

最后我的问题是: 我应该如何实施渠道创建? 每条消息? 每个消费者是否都有一个独特的私人频道,发布者是否同步访问单个唯一频道? 你明白我的意思了。 请记住,我打算使用一台服务器,其中有几十个消费者/发布者,仅此而已。

I'm writing a simple class that my apps will use to send and receive messages using RabbitMQ.
I've read as many how-tos, blog posts, white papers and the likes about RabbitMQ as I could find.
Most of the examples have the connection and channel wrapped in a using block, and contradict it by saying that you should probably implement them as a singleton.
Specifically, regarding the channel, I've seen comments saying that you shouldn't have more than a single thread using a single channel at the same time.

I'm writing my library in C#. It's a singleton having a static connection connected on first instantiation.

I thought about doing the same for the channel, but I intend to use the same library to allow publishing/subscribing to multiple exchanges/queues. Both publishing and subscribing might be done from multiple threads.

And finally my question:
How should I implement channel creation?
Per message?
Have each consumer have a unique private channel, publisher sync access to a single unique channel?
You catch my drift.
Please keep in mind that I'm intending to use a single server, with several dozens of consumers/publishers, not much more.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

千年*琉璃梦 2024-11-09 00:08:39

借助 ASP.NET Core,您可以利用 ObjectPool。创建 IPooledObjectPolicy

    using Microsoft.Extensions.ObjectPool;  
    using Microsoft.Extensions.Options;  
    using RabbitMQ.Client;  

    public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>  
    {  
        private readonly RabbitOptions _options;  

        private readonly IConnection _connection;  

        public RabbitModelPooledObjectPolicy(IOptions<RabbitOptions> optionsAccs)  
        {  
            _options = optionsAccs.Value;  
            _connection = GetConnection();  
        }  

        private IConnection GetConnection()  
        {  
            var factory = new ConnectionFactory()  
            {  
                HostName = _options.HostName,  
                UserName = _options.UserName,  
                Password = _options.Password,  
                Port = _options.Port,  
                VirtualHost = _options.VHost,  
            };  

            return factory.CreateConnection();  
        }  

        public IModel Create()  
        {  
            return _connection.CreateModel();  
        }  

        public bool Return(IModel obj)  
        {  
            if (obj.IsOpen)  
            {  
                return true;  
            }  
            else  
            {  
                obj?.Dispose();  
                return false;  
            }  
        }  
    }  

然后为 ObjectPool 配置依赖注入

services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddSingleton(s =>
{
   var provider = s.GetRequiredService<ObjectPoolProvider>();
   return provider.Create(new RabbitModelPooledObjectPolicy());
});

然后您可以注入 ObjectPool 并使用它

var channel = pool.Get();
try
{
    channel.BasicPublish(...);
}
finally
{
    pool.Return(channel);
}

来源:

https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/

https://developpaper.com/detailed-explanation-of-object-pools-various-usages-in-net-core/

With ASP.NET Core, there is ObjectPool that you can leverage on. Create an IPooledObjectPolicy

    using Microsoft.Extensions.ObjectPool;  
    using Microsoft.Extensions.Options;  
    using RabbitMQ.Client;  

    public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>  
    {  
        private readonly RabbitOptions _options;  

        private readonly IConnection _connection;  

        public RabbitModelPooledObjectPolicy(IOptions<RabbitOptions> optionsAccs)  
        {  
            _options = optionsAccs.Value;  
            _connection = GetConnection();  
        }  

        private IConnection GetConnection()  
        {  
            var factory = new ConnectionFactory()  
            {  
                HostName = _options.HostName,  
                UserName = _options.UserName,  
                Password = _options.Password,  
                Port = _options.Port,  
                VirtualHost = _options.VHost,  
            };  

            return factory.CreateConnection();  
        }  

        public IModel Create()  
        {  
            return _connection.CreateModel();  
        }  

        public bool Return(IModel obj)  
        {  
            if (obj.IsOpen)  
            {  
                return true;  
            }  
            else  
            {  
                obj?.Dispose();  
                return false;  
            }  
        }  
    }  

Then configure dependency injection for ObjectPool

services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddSingleton(s =>
{
   var provider = s.GetRequiredService<ObjectPoolProvider>();
   return provider.Create(new RabbitModelPooledObjectPolicy());
});

You can then inject ObjectPool<IModel>, and use it

var channel = pool.Get();
try
{
    channel.BasicPublish(...);
}
finally
{
    pool.Return(channel);
}

Sources:

https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/

https://developpaper.com/detailed-explanation-of-object-pools-various-usages-in-net-core/

你怎么这么可爱啊 2024-11-09 00:08:39

编辑(2016-1-26):通道不是线程安全的。有关该内容的文档在 4 月之间发生了更改五月 2015 年。新文本:

通道实例不得在线程之间共享。应用程序应该更喜欢每个线程使用一个 Channel,而不是在多个线程之间共享相同的 Channel。虽然通道上的某些操作可以安全地并发调用,但有些操作则不然,并且会导致线路上的帧交错不正确。在线程之间共享通道也会干扰*发布者确认。

从您的问题来看,您似乎没有预定义的固定数量的线程,这些线程主要用于发布/订阅 RabbitMQ (在这种情况下,您可能会考虑创建一个通道作为线程初始化的一部分,或者使用 ThreadLocal)。

如果并发 RabbitMQ 操作很少或者消息大小总是很小,您可能只需在所有 RabbitMQ 发布/订阅操作周围放置一个锁(通道)就可以了。如果您需要使用任意线程以交错方式传输多个请求(这就是通道的用途),您可能需要创建一个通道池,例如 ConcurrentQueueConcurrentQueue。 IModel>,您可以在其中将未使用的通道入队,并在需要时将其出队。 通道创建的开销非常低,并且从性能测试中我感觉通道创建的过程不涉及任何网络io,即似乎客户端第一次使用时会在 RabbitMQ 服务器中自动创建一个通道。 编辑:谢谢 Pang,不需要每次操作都打开一个通道,这样做效率非常低,因为打开通道是一次网络往返。


OLD(预2016-1-26):Java 和 .net 实现的现在大部分已过时的细节:

Re:通道和多线程,由于其对实现的依赖而有点令人困惑。

Java实现通道是线程安全的 :

通道实例可以安全地被多个线程使用。

但是

当多个线程之间共享 Channel 时,确认处理不正确

.net 实现通道不是线程安全的

如果多个线程需要访问某一特定 IModel 实例,应用程序应自行强制执行互斥。

IModel 操作序列化不正确的症状包括但不限于:

• 线路上发送的帧序列无效

• 抛出 NotSupportedExceptions ...

因此,除了 Robin 的有用答案(无论线程是否安全都适用)之外, 在 .net 实现中,您不能只共享频道

Edit (2016-1-26): Channels are NOT thread safe. The documentation on that has changed between April and May 2015. The new text:

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. Sharing channels between threads will also interfere with * Publisher Confirms.

From your question it sounds like you don't have a predefined, fixed number of threads that do mostly publishing / subscribing to RabbitMQ (in which case you might consider creating a channel as part of the initialization of the thread, or using a ThreadLocal<IModel>).

If concurrent RabbitMQ operations are rare or message sizes always small, you might get away with simply putting a lock(channel) around all your RabbitMQ pub/sub operations. If you need multiple requests to be transmitted in an interleaved fashion - that's what channels are for in the first place - using arbitrary threads, you might want to create a channel pool, e.g. a ConcurrentQueue<IModel> where you Enqueue unused channels and Dequeue for the time you need them. Channel creation is very low-overhead, and I have the feeling, from performance tests, that the process of channel creation does not involve any network io, i.e. it seems a channel gets automatically created in the RabbitMQ server on first use by a client. Edit: Thanks Pang, There is no need to open a channel per operation and doing so would be very inefficient, since opening a channel is a network roundtrip.


OLD (pre 2016-1-26): The now mostly obsolete details of the Java and .net implementations:

Re: channels and multiple threads, which is a bit confusing due to its dependence on the implementation.

Java implementation: Channels are thread safe:

Channel instances are safe for use by multiple threads.

But:

confirms are not handled properly when a Channel is shared between multiple threads

.net implementation: Channels are not thread safe:

If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion itself.

Symptoms of incorrect serialisation of IModel operations include, but are not limited to,

• invalid frame sequences being sent on the wire

• NotSupportedExceptions being thrown ...

So in addition to Robin's useful answer, which applies regardless of whether it's thread safe or not, in the .net implementation, you can't just share a channel.

装迷糊 2024-11-09 00:08:39

它阐明了 aqmp 的内部结构。
目前,我的理解是:

A. 我可以从每个应用程序保持到服务器的单个共享 TCP 连接(作为静态共享资源)

B. 我应该为每个“任务”创建一个通道(一个用于监听队列X 和一个用于发布以交换 Y 等。假设这些“任务”可以并行执行)

C。或者我可以对单个应用程序中的所有内容使用一个通道,同时确保对其的访问是同步的 - 使用某种锁定机制,假设使用通道的实际时间跨度(锁定)相对非常短。

It clarifies aqmp internals.
Currently, my understanding is:

A. I can hold a single, shared, tcp connection to the server from each application (as a static shared resource)

B. I should either create a channel for each "task" (one for listening to queue X and one for publishing to exchange Y, etc. assuming these "tasks" can be executed in parallel)

C. Or I can use one channel for everything within a single app, while making sure access to it is synchronized - using some locking mechanism, assuming that the actual time spans the channel is used (locked) are relatively very short.

念﹏祤嫣 2024-11-09 00:08:39

我无法评论 C# 实现的细节,但了解 Amqp 通道被设计为共享单个 TCP 连接(即启用多路复用)可能会有所帮助。单个通道一次只能发送一条消息或接收一条消息,但一个连接可以同时接收不同通道上的消息。假设您有 2 个 1GB 的大文件,通过 Amqp 发送给单个消费者,消息可能会被分割成 10K 块并以交错方式发送。您可以在设置连接时操纵默认的 Amqp 消息大小,这会影响您是否以及何时可能遇到交错;据我所知,此功能旨在帮助防止当多个消费者共享连接并且一个消费者接收大消息时出现饥饿。

HTH。

I can't comment on the specifics of a C# implementation, but it may help to know that Amqp channels are designed to share a single TCP connection, i.e. to enable multiplexing. A single channel can only send one or receive one message at once, but a connection can receive messages on different channels simultaneously. Image you've got 2 large 1GB files that you send over Amqp to a single consumer, it's possible that the messages will be split up in to 10K chunks and sent in an interleaved fashion. You can manipulate the default Amqp message size when you're setting up the connection, this has a bearing on whether and when you're likely to experience interleaving; AFAIK this feature is intended to help prevent starvation when multiple consumers share a connection and one consumer receives large messages.

HTH.

梦醒灬来后我 2024-11-09 00:08:39

您是对的,通道不是线程安全的,并且不应由多个线程访问。

如果您计划使用不同的队列,则可以在一个通道中拥有多个队列。但是,如果您计划进行多个交换(不确定为什么在这里需要多个交换),您将被迫要么在单例中跟踪多个交换和通道,要么将该责任转移给调用者。

我将构建您的单例,例如它具有观察者模式,并在其中保存所有 RabbitMQ 内容以及对调用者的引用。然后,您还需要将调用编组到处理此问题的单个线程,否则可能会面临通道对象出现问题的风险。

You are correct that the Channel is NOT threadsafe and should not be accessed by more than one thread.

If you're planning on using different queues, you can have multiple queues with a single channel. But if you're planning on multiple exchanges (not sure why you would want more than one here), you would be forced to either keep track of multiple exchanges and channels in your singleton, or offload that responsibility to the callers.

I would build your singleton such as it has an observer pattern and hold all the RabbitMQ stuff in there with references to your callers. You would also then need to marshall calls to a single thread that deals with this or you could risk problems with the channel objects.

凉世弥音 2024-11-09 00:08:39

消费者的并发注意事项
有许多与并发相关的主题供库用户考虑。
在线程之间共享通道
应避免多个线程同时使用 IModel 实例。应用程序代码应该维护 IModel 实例的线程所有权的明确概念。
这对发布者来说是一个硬性要求:共享并发发布的通道(IModel 实例)将导致协议级别的错误帧交错。 频道实例不得被在其上发布的线程共享

RabbitMQ 消费者并发注意事项

Concurrency Considerations for Consumers
There is a number of concurrency-related topics for a library user to consider.
Sharing Channels Between Threads
IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances.
This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them.

RabbitMQ Concurrency Considerations for Consumers

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文