我正在编写一个简单的类,我的应用程序将使用它来使用 RabbitMQ 发送和接收消息。
我已经阅读了尽可能多的有关 RabbitMQ 的操作方法、博客文章、白皮书和类似内容。
大多数示例都将连接和通道包装在 using
块中,并反驳说您应该将它们实现为单例。
具体来说,关于通道,我看到评论说您不应该有多个线程同时使用单个通道。
我正在用 C# 编写我的库。它是一个在第一次实例化时具有静态连接的单例。
我考虑过对频道做同样的事情,但我打算使用相同的库来允许发布/订阅多个交换/队列。发布和订阅都可以通过多个线程完成。
最后我的问题是:
我应该如何实施渠道创建?
每条消息?
每个消费者是否都有一个独特的私人频道,发布者是否同步访问单个唯一频道?
你明白我的意思了。
请记住,我打算使用一台服务器,其中有几十个消费者/发布者,仅此而已。
I'm writing a simple class that my apps will use to send and receive messages using RabbitMQ.
I've read as many how-tos, blog posts, white papers and the likes about RabbitMQ as I could find.
Most of the examples have the connection and channel wrapped in a using
block, and contradict it by saying that you should probably implement them as a singleton.
Specifically, regarding the channel, I've seen comments saying that you shouldn't have more than a single thread using a single channel at the same time.
I'm writing my library in C#. It's a singleton having a static connection connected on first instantiation.
I thought about doing the same for the channel, but I intend to use the same library to allow publishing/subscribing to multiple exchanges/queues. Both publishing and subscribing might be done from multiple threads.
And finally my question:
How should I implement channel creation?
Per message?
Have each consumer have a unique private channel, publisher sync access to a single unique channel?
You catch my drift.
Please keep in mind that I'm intending to use a single server, with several dozens of consumers/publishers, not much more.
发布评论
评论(6)
借助 ASP.NET Core,您可以利用 ObjectPool。创建 IPooledObjectPolicy
然后为 ObjectPool 配置依赖注入
然后您可以注入
ObjectPool
并使用它来源:
https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/
https://developpaper.com/detailed-explanation-of-object-pools-various-usages-in-net-core/
With ASP.NET Core, there is ObjectPool that you can leverage on. Create an IPooledObjectPolicy
Then configure dependency injection for ObjectPool
You can then inject
ObjectPool<IModel>
, and use itSources:
https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/
https://developpaper.com/detailed-explanation-of-object-pools-various-usages-in-net-core/
编辑(2016-1-26):通道不是线程安全的。有关该内容的文档在 4 月之间发生了更改 和 五月 2015 年。新文本:
从您的问题来看,您似乎没有预定义的固定数量的线程,这些线程主要用于发布/订阅 RabbitMQ (在这种情况下,您可能会考虑创建一个通道作为线程初始化的一部分,或者使用
ThreadLocal
)。如果并发 RabbitMQ 操作很少或者消息大小总是很小,您可能只需在所有 RabbitMQ 发布/订阅操作周围放置一个
锁(通道)
就可以了。如果您需要使用任意线程以交错方式传输多个请求(这就是通道的用途),您可能需要创建一个通道池,例如ConcurrentQueue
ConcurrentQueue。 IModel>
,您可以在其中将未使用的通道入队,并在需要时将其出队。通道创建的开销非常低,并且从性能测试中我感觉通道创建的过程不涉及任何网络io,即似乎客户端第一次使用时会在 RabbitMQ 服务器中自动创建一个通道。编辑:谢谢 Pang,不需要每次操作都打开一个通道,这样做效率非常低,因为打开通道是一次网络往返。
OLD(预2016-1-26):Java 和 .net 实现的现在大部分已过时的细节:
Re:通道和多线程,由于其对实现的依赖而有点令人困惑。
Java实现:通道是线程安全的 :
但是:
.net 实现:通道不是线程安全的:
因此,除了 Robin 的有用答案(无论线程是否安全都适用)之外, 在 .net 实现中,您不能只共享频道。
Edit (2016-1-26): Channels are NOT thread safe. The documentation on that has changed between April and May 2015. The new text:
From your question it sounds like you don't have a predefined, fixed number of threads that do mostly publishing / subscribing to RabbitMQ (in which case you might consider creating a channel as part of the initialization of the thread, or using a
ThreadLocal<IModel>
).If concurrent RabbitMQ operations are rare or message sizes always small, you might get away with simply putting a
lock(channel)
around all your RabbitMQ pub/sub operations. If you need multiple requests to be transmitted in an interleaved fashion - that's what channels are for in the first place - using arbitrary threads, you might want to create a channel pool, e.g. aConcurrentQueue<IModel>
where you Enqueue unused channels and Dequeue for the time you need them.Channel creation is very low-overhead, and I have the feeling, from performance tests, that the process of channel creation does not involve any network io, i.e. it seems a channel gets automatically created in the RabbitMQ server on first use by a client.Edit: Thanks Pang,There is no need to open a channel per operation and doing so would be very inefficient, since opening a channel is a network roundtrip.
OLD (pre 2016-1-26): The now mostly obsolete details of the Java and .net implementations:
Re: channels and multiple threads, which is a bit confusing due to its dependence on the implementation.
Java implementation: Channels are thread safe:
But:
.net implementation: Channels are not thread safe:
So in addition to Robin's useful answer, which applies regardless of whether it's thread safe or not, in the .net implementation, you can't just share a channel.
它阐明了 aqmp 的内部结构。
目前,我的理解是:
A. 我可以从每个应用程序保持到服务器的单个共享 TCP 连接(作为静态共享资源)
B. 我应该为每个“任务”创建一个通道(一个用于监听队列X 和一个用于发布以交换 Y 等。假设这些“任务”可以并行执行)
C。或者我可以对单个应用程序中的所有内容使用一个通道,同时确保对其的访问是同步的 - 使用某种锁定机制,假设使用通道的实际时间跨度(锁定)相对非常短。
It clarifies aqmp internals.
Currently, my understanding is:
A. I can hold a single, shared, tcp connection to the server from each application (as a static shared resource)
B. I should either create a channel for each "task" (one for listening to queue X and one for publishing to exchange Y, etc. assuming these "tasks" can be executed in parallel)
C. Or I can use one channel for everything within a single app, while making sure access to it is synchronized - using some locking mechanism, assuming that the actual time spans the channel is used (locked) are relatively very short.
我无法评论 C# 实现的细节,但了解 Amqp 通道被设计为共享单个 TCP 连接(即启用多路复用)可能会有所帮助。单个通道一次只能发送一条消息或接收一条消息,但一个连接可以同时接收不同通道上的消息。假设您有 2 个 1GB 的大文件,通过 Amqp 发送给单个消费者,消息可能会被分割成 10K 块并以交错方式发送。您可以在设置连接时操纵默认的 Amqp 消息大小,这会影响您是否以及何时可能遇到交错;据我所知,此功能旨在帮助防止当多个消费者共享连接并且一个消费者接收大消息时出现饥饿。
HTH。
I can't comment on the specifics of a C# implementation, but it may help to know that Amqp channels are designed to share a single TCP connection, i.e. to enable multiplexing. A single channel can only send one or receive one message at once, but a connection can receive messages on different channels simultaneously. Image you've got 2 large 1GB files that you send over Amqp to a single consumer, it's possible that the messages will be split up in to 10K chunks and sent in an interleaved fashion. You can manipulate the default Amqp message size when you're setting up the connection, this has a bearing on whether and when you're likely to experience interleaving; AFAIK this feature is intended to help prevent starvation when multiple consumers share a connection and one consumer receives large messages.
HTH.
您是对的,通道不是线程安全的,并且不应由多个线程访问。
如果您计划使用不同的队列,则可以在一个通道中拥有多个队列。但是,如果您计划进行多个交换(不确定为什么在这里需要多个交换),您将被迫要么在单例中跟踪多个交换和通道,要么将该责任转移给调用者。
我将构建您的单例,例如它具有观察者模式,并在其中保存所有 RabbitMQ 内容以及对调用者的引用。然后,您还需要将调用编组到处理此问题的单个线程,否则可能会面临通道对象出现问题的风险。
You are correct that the Channel is NOT threadsafe and should not be accessed by more than one thread.
If you're planning on using different queues, you can have multiple queues with a single channel. But if you're planning on multiple exchanges (not sure why you would want more than one here), you would be forced to either keep track of multiple exchanges and channels in your singleton, or offload that responsibility to the callers.
I would build your singleton such as it has an observer pattern and hold all the RabbitMQ stuff in there with references to your callers. You would also then need to marshall calls to a single thread that deals with this or you could risk problems with the channel objects.
RabbitMQ 消费者并发注意事项
RabbitMQ Concurrency Considerations for Consumers