如果有大量请求尝试同时使用ipooledObject -policy,则createConnection()失败
我需要在.NET应用程序中使用RabbitMQ。我的策略是拥有一个iConnection
和多个imodel
由ipooledobjectPolicy
处理。
问题是,如果有大量请求尝试同时使用sendMessage
方法,我会在factory.createconnection();
上获得此错误消息,我相信消息本身与实际问题无关:
rabbitmq.client.exceptions.brokerunreachableException
引起的
消息=指定的终点都不到达
ioException:Connection。从未收到启动,可能是由于网络超时
如果只有一个请求尝试使用sendmessage
首次使用,那么在此之后一切正常,即使同时有大量请求触发。
这是实现ipooledObjectPolicy< imodel>
:
private readonly RabbitMqConfigurations rabbitMqConfigurations;
private readonly IConnection _connection;
public RabbitModelPooledObjectPolicy(IOptions<RabbitMqConfigurations> RabbitMqConfigurations)
{
rabbitMqConfigurations = RabbitMqConfigurations.Value;
_connection = GetConnection();
}
private IConnection GetConnection()
{
var factory = new ConnectionFactory
{
HostName = rabbitMqConfigurations.HostName
};
return factory.CreateConnection(); //<-- it fails
}
public IModel Create() => return _connection.CreateModel();
public bool Return(IModel obj)
{
if (obj.IsOpen)
return true;
else
{
obj?.Dispose();
return false;
}
}
这就是我使用ipooledobjectpolicy&lt; imodel&gt;
:
private readonly DefaultObjectPool<IModel> objectPool;
public RabbitMQProducer(IPooledObjectPolicy<IModel> objectPolicy)
{
objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
}
public void SendMessage<T>(T message, string queue) where T : class
{
var channel = objectPool.Get();
try
{
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(exchange: "", routingKey: queue, basicProperties: properties, body: body);
}
catch
{
throw;
}
finally
{
objectPool.Return(channel);
}
}
我将这两个服务注册为Singleton:Singleton:
services.AddSingleton<IPooledObjectPolicy<IModel>, RabbitModelPooledObjectPolicy>();
services.AddSingleton<IMessageProducer, RabbitMQProducer>();
I need to use RabbitMQ in my .NET application. My strategy is to have a single IConnection
and multiple IModel
handled by IPooledObjectPolicy
.
Problem is if a huge number of requests try to use SendMessage
method simultaneously for the first time, I get this error message on factory.CreateConnection();
I believe the message itself is irrelevant to the actual problem:
RabbitMQ.Client.Exceptions.BrokerUnreachableException
Message=None of the specified endpoints were reachable
IOException: connection.start was never received, likely due to a network timeout
If only one request tries to use SendMessage
for the first time, everything works fine after that even if a huge number of requests triggers simultaneously.
Here is implementation of the IPooledObjectPolicy<IModel>
:
private readonly RabbitMqConfigurations rabbitMqConfigurations;
private readonly IConnection _connection;
public RabbitModelPooledObjectPolicy(IOptions<RabbitMqConfigurations> RabbitMqConfigurations)
{
rabbitMqConfigurations = RabbitMqConfigurations.Value;
_connection = GetConnection();
}
private IConnection GetConnection()
{
var factory = new ConnectionFactory
{
HostName = rabbitMqConfigurations.HostName
};
return factory.CreateConnection(); //<-- it fails
}
public IModel Create() => return _connection.CreateModel();
public bool Return(IModel obj)
{
if (obj.IsOpen)
return true;
else
{
obj?.Dispose();
return false;
}
}
This is how I used the IPooledObjectPolicy<IModel>
:
private readonly DefaultObjectPool<IModel> objectPool;
public RabbitMQProducer(IPooledObjectPolicy<IModel> objectPolicy)
{
objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
}
public void SendMessage<T>(T message, string queue) where T : class
{
var channel = objectPool.Get();
try
{
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(exchange: "", routingKey: queue, basicProperties: properties, body: body);
}
catch
{
throw;
}
finally
{
objectPool.Return(channel);
}
}
I registered these two services as singleton:
services.AddSingleton<IPooledObjectPolicy<IModel>, RabbitModelPooledObjectPolicy>();
services.AddSingleton<IMessageProducer, RabbitMQProducer>();
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我可以设法解决这个问题。
说明问题:
rabbitmodelpooledobjectpolicy
构造器首次访问课时。在执行构造函数的同时,应用程序也试图处理其他请求,并且请求很多,并且不断地像不间断的流一样出现。因此,似乎某种程度上的应用程序正在放弃
factory.createconnection()
临时处理其他即将到来的请求,以执行factory.createconnection()
稍后。但是,没有终点的请求,并且相同的情况会发生,直到factory.createconnection()
引发了超时的例外。解决方案:
我热身
ipooledobject -policy&lt; imodel&gt;
inconfigure
instartup.cs
ececuteeccodute
rabbitMbitModelpoolpoolpoolpoolpoolpoolpoolpoolpoolpoolpoolpoolpolicy
启动中的构造函数在应用程序准备就绪并收到请求之前。I could manage to fix this.
Explanation of the problem:
RabbitModelPooledObjectPolicy
constructor executes when class is accessed for the first time. While the constructor is executing, application is trying to handle other requests as well and the requests are a lot and are keep coming like a non-stop stream.Therefore it seems somehow application is abandoning
factory.CreateConnection()
temporarily to handle other coming requests in order to executesfactory.CreateConnection()
later. However, there is no end to coming requests and the same scenario goes untilfactory.CreateConnection()
throws a time out exception.Solution:
I warmed up
IPooledObjectPolicy<IModel>
inConfigure
method instartup.cs
to executeRabbitModelPooledObjectPolicy
constructor in startup before application gets ready and receives requests.