VERTX事件总线缓慢消耗问题

发布于 2025-01-20 20:14:47 字数 2445 浏览 4 评论 0原文

我们有一个非聚类的VERTX应用程序,我们使用事件总线在垂直之间进行内部通信。

  • Verticle A从公共汽车上消耗,执行HTTP请求,并通过公共汽车发送回复。

  • Verticle B只是要求执行该HTTP请求。

当Verticle B执行“高”请求量时,就会出现问题。然后,消费者开始收到较慢的事件(大概是因为它们在事件总线中排队)。对于8个请求/秒,总线最多需要3-4秒即可消耗事件。当请求/秒提高时,消耗它可能需要超过30秒的时间,因此触发了总线超时。

问题是,垂直A的执行HTTP操作(〜200ms)的确很快,因此我真的不明白为什么请求被卡在公共汽车上。

我们已经尝试了许多解决方案,但是没有其他方法可以使用:

  • 部署垂直a的多个实例,因为工人
  • 使用vertx.executeblocking()执行HTTP请求,

唯一有效的是评论HTTP请求并通过总线返回模拟对象。但是,同样,HTTP请求不得超过200ms,因此它不应阻止总线。

其他信息:我们使用使用Raturofit + OkhttpClient的自动化的REST客户端。由于公司策略,我们无法使用VERTX WebClient,因此我没有尝试此解决方案。

示例

这是我们代码的真正简化版本,因此您可以检查我是否缺少某些内容。

垂直A

// Instantiated in Verticle A
public class EmailSender {

    private final Vertx vertx;
    private final EmailApiClient emailApiClient;

    public EmailSender(Vertx vertx) {
        this.vertx = vertx;
        emailApiClient = ClientFactory.createEmailApiClient();
    }

    public void start() {
        vertx.eventBus().consumer("sendEmail", this::sendEmail);
    }

    public void sendEmail(Message<EmailRequest> message) {
        EmailRequest emailRequest = message.body();
        emailApiClient.sendEmail(emailRequest).subscribe(
            response -> {
                if (response.code() == 200) {
                    EmailResponse emailResponse = response.body();
                    message.reply(emailResponse);

                } else {
                    message.fail(500, "Error sending email");
                }
            });
    }
}

verticer b

// Instantiated in Verticle B
public class EmailCommunications {

    private final Vertx vertx;

    public EmailCommunications(Vertx vertx) {
        this.vertx = vertx;
    }

    public Single<EmailResponse> sendEmail(EmailRequest emailRequest) {
        SingleSubject<EmailResponse> emailSent = SingleSubject.create();
        vertx.eventBus().request(
            "sendEmail",
            emailRequest,
            busResult -> {
                if (busResult.succeded()) {
                    emailSent.onSuccess(busResult.result().body())
                } else {
                    emailSent.onError(busResult.cause())
                }
            }
        );
        return emailSent;
    }
}

We have a non clustered vertx application, and we use the event bus to internally communicate between verticles.

  • Verticle A consumes from the bus, performs a HTTP request, and sends the response back through the bus.

  • Verticle B just request to perform that HTTP request.

The problem appears when a "high" request volume is performed by Verticle B. Then, the consumer starts receiving the events slower and slower (presumably because they are getting queued in the event bus). For 8 requests/second the bus takes up to 3-4 seconds to consume the event. When the requests/second are elevated, it can take more than 30 seconds to consume it, so the bus timeout is triggered.

The thing is, Verticle A is really fast performing the HTTP operation (~200ms) so I don't really understand why the requests get stuck in the bus.

We've tried many solutions but none ot then worked:

  • Deploy multiple instances of Verticle A as workers
  • Use vertx.executeBlocking() to perform the HTTP request

The only thing that worked was commenting the HTTP request and returning a mock object through the bus. But again, the HTTP request doesn't take more than 200ms, so it shouldn't be blocking the bus.

Additional information: We use an autogenerated rest client that uses Retrofit + OkHttpClient. Due to company policy, we cannot use Vertx WebClient, so I didn't try this solution.

EXAMPLE

This is a really simplified version of our code so you can check if I'm missing something.

VERTICLE A

// Instantiated in Verticle A
public class EmailSender {

    private final Vertx vertx;
    private final EmailApiClient emailApiClient;

    public EmailSender(Vertx vertx) {
        this.vertx = vertx;
        emailApiClient = ClientFactory.createEmailApiClient();
    }

    public void start() {
        vertx.eventBus().consumer("sendEmail", this::sendEmail);
    }

    public void sendEmail(Message<EmailRequest> message) {
        EmailRequest emailRequest = message.body();
        emailApiClient.sendEmail(emailRequest).subscribe(
            response -> {
                if (response.code() == 200) {
                    EmailResponse emailResponse = response.body();
                    message.reply(emailResponse);

                } else {
                    message.fail(500, "Error sending email");
                }
            });
    }
}

VERTICLE B

// Instantiated in Verticle B
public class EmailCommunications {

    private final Vertx vertx;

    public EmailCommunications(Vertx vertx) {
        this.vertx = vertx;
    }

    public Single<EmailResponse> sendEmail(EmailRequest emailRequest) {
        SingleSubject<EmailResponse> emailSent = SingleSubject.create();
        vertx.eventBus().request(
            "sendEmail",
            emailRequest,
            busResult -> {
                if (busResult.succeded()) {
                    emailSent.onSuccess(busResult.result().body())
                } else {
                    emailSent.onError(busResult.cause())
                }
            }
        );
        return emailSent;
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

糖果控 2025-01-27 20:14:47

我们修复了更改OKHTTPCLIENT配置的问题,因此HTTP请求不会卡住


default void configureOkHttpClient(OkHttpClient.Builder okHttpClientBuilder) {
        ConnectionPool connectionPool = new ConnectionPool(40, 5, TimeUnit.MINUTES);
        Dispatcher dispatcher = new Dispatcher();
        dispatcher.setMaxRequestsPerHost(200);
        dispatcher.setMaxRequests(200);

        okHttpClientBuilder
            .readTimeout(60, TimeUnit.SECONDS)
            .retryOnConnectionFailure(true)
            .connectionPool(connectionPool)
            .dispatcher(dispatcher);
    }

We fixed the issue changing our OkHttpClient configuration so HTTP requests won't get stuck


default void configureOkHttpClient(OkHttpClient.Builder okHttpClientBuilder) {
        ConnectionPool connectionPool = new ConnectionPool(40, 5, TimeUnit.MINUTES);
        Dispatcher dispatcher = new Dispatcher();
        dispatcher.setMaxRequestsPerHost(200);
        dispatcher.setMaxRequests(200);

        okHttpClientBuilder
            .readTimeout(60, TimeUnit.SECONDS)
            .retryOnConnectionFailure(true)
            .connectionPool(connectionPool)
            .dispatcher(dispatcher);
    }

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文