如果有大量请求尝试同时使用ipooledObject -policy,则createConnection()失败

发布于 2025-02-12 23:58:04 字数 2491 浏览 0 评论 0原文

我需要在.NET应用程序中使用RabbitMQ。我的策略是拥有一个iConnection和多个imodelipooledobjectPolicy处理。

问题是,如果有大量请求尝试同时使用sendMessage方法,我会在factory.createconnection();上获得此错误消息,我相信消息本身与实际问题无关:

rabbitmq.client.exceptions.brokerunreachableException
消息=指定的终点都不到达
ioException:Connection。从未收到启动,可能是由于网络超时

引起的

如果只有一个请求尝试使用sendmessage首次使用,那么在此之后一切正常,即使同时有大量请求触发。

这是实现ipooledObjectPolicy< imodel>

private readonly RabbitMqConfigurations rabbitMqConfigurations;
private readonly IConnection _connection;

public RabbitModelPooledObjectPolicy(IOptions<RabbitMqConfigurations> RabbitMqConfigurations)
{
    rabbitMqConfigurations = RabbitMqConfigurations.Value;
    _connection = GetConnection();
}

private IConnection GetConnection()
{
    var factory = new ConnectionFactory
    {
        HostName = rabbitMqConfigurations.HostName
    };

    return factory.CreateConnection(); //<-- it fails
}

public IModel Create() => return _connection.CreateModel();

public bool Return(IModel obj)
{
    if (obj.IsOpen)
        return true;
    else
    {
        obj?.Dispose();
        return false;
    }
}

这就是我使用ipooledobjectpolicy&lt; imodel&gt;

private readonly DefaultObjectPool<IModel> objectPool;

public RabbitMQProducer(IPooledObjectPolicy<IModel> objectPolicy)
{
    objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
}

public void SendMessage<T>(T message, string queue) where T : class
{
    var channel = objectPool.Get();

    try
    {
        channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);

        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        var json = JsonConvert.SerializeObject(message);
        var body = Encoding.UTF8.GetBytes(json);

        channel.BasicPublish(exchange: "", routingKey: queue, basicProperties: properties, body: body);
    }
    catch
    {
        throw;
    }
    finally
    {
        objectPool.Return(channel);
    }
}

我将这两个服务注册为Singleton:Singleton:

services.AddSingleton<IPooledObjectPolicy<IModel>, RabbitModelPooledObjectPolicy>();
services.AddSingleton<IMessageProducer, RabbitMQProducer>();

I need to use RabbitMQ in my .NET application. My strategy is to have a single IConnection and multiple IModel handled by IPooledObjectPolicy.

Problem is if a huge number of requests try to use SendMessage method simultaneously for the first time, I get this error message on factory.CreateConnection(); I believe the message itself is irrelevant to the actual problem:

RabbitMQ.Client.Exceptions.BrokerUnreachableException
Message=None of the specified endpoints were reachable
IOException: connection.start was never received, likely due to a network timeout

If only one request tries to use SendMessage for the first time, everything works fine after that even if a huge number of requests triggers simultaneously.

Here is implementation of the IPooledObjectPolicy<IModel>:

private readonly RabbitMqConfigurations rabbitMqConfigurations;
private readonly IConnection _connection;

public RabbitModelPooledObjectPolicy(IOptions<RabbitMqConfigurations> RabbitMqConfigurations)
{
    rabbitMqConfigurations = RabbitMqConfigurations.Value;
    _connection = GetConnection();
}

private IConnection GetConnection()
{
    var factory = new ConnectionFactory
    {
        HostName = rabbitMqConfigurations.HostName
    };

    return factory.CreateConnection(); //<-- it fails
}

public IModel Create() => return _connection.CreateModel();

public bool Return(IModel obj)
{
    if (obj.IsOpen)
        return true;
    else
    {
        obj?.Dispose();
        return false;
    }
}

This is how I used the IPooledObjectPolicy<IModel>:

private readonly DefaultObjectPool<IModel> objectPool;

public RabbitMQProducer(IPooledObjectPolicy<IModel> objectPolicy)
{
    objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
}

public void SendMessage<T>(T message, string queue) where T : class
{
    var channel = objectPool.Get();

    try
    {
        channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);

        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        var json = JsonConvert.SerializeObject(message);
        var body = Encoding.UTF8.GetBytes(json);

        channel.BasicPublish(exchange: "", routingKey: queue, basicProperties: properties, body: body);
    }
    catch
    {
        throw;
    }
    finally
    {
        objectPool.Return(channel);
    }
}

I registered these two services as singleton:

services.AddSingleton<IPooledObjectPolicy<IModel>, RabbitModelPooledObjectPolicy>();
services.AddSingleton<IMessageProducer, RabbitMQProducer>();

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

深海少女心 2025-02-19 23:58:04

我可以设法解决这个问题。

说明问题:

rabbitmodelpooledobjectpolicy构造器首次访问课时。在执行构造函数的同时,应用程序也试图处理其他请求,并且请求很多,并且不断地像不间断的流一样出现。

因此,似乎某种程度上的应用程序正在放弃factory.createconnection()临时处理其他即将到来的请求,以执行factory.createconnection()稍后。但是,没有终点的请求,并且相同的情况会发生,直到factory.createconnection()引发了超时的例外。

解决方案:

我热身ipooledobject -policy&lt; imodel&gt; in configure in startup.cs ececute eccodute rabbitMbitModelpoolpoolpoolpoolpoolpoolpoolpoolpoolpoolpoolpoolpolicy启动中的构造函数在应用程序准备就绪并收到请求之前。

public void Configure(IApplicationBuilder app, IWebHostEnvironment env){
    app.ApplicationServices.GetService<IPooledObjectPolicy<IModel>>();
}

I could manage to fix this.

Explanation of the problem:

RabbitModelPooledObjectPolicy constructor executes when class is accessed for the first time. While the constructor is executing, application is trying to handle other requests as well and the requests are a lot and are keep coming like a non-stop stream.

Therefore it seems somehow application is abandoning factory.CreateConnection() temporarily to handle other coming requests in order to executes factory.CreateConnection() later. However, there is no end to coming requests and the same scenario goes until factory.CreateConnection() throws a time out exception.

Solution:

I warmed up IPooledObjectPolicy<IModel> in Configure method in startup.cs to execute RabbitModelPooledObjectPolicy constructor in startup before application gets ready and receives requests.

public void Configure(IApplicationBuilder app, IWebHostEnvironment env){
    app.ApplicationServices.GetService<IPooledObjectPolicy<IModel>>();
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文