Spring Cloud Stream StreamBridge性能低?

发布于 2025-01-13 00:43:00 字数 1557 浏览 0 评论 0原文

我正在使用 Spring Cloud StreamBridge 将消息发布到 RabbitMQ 交换。借助本机 RabbitMQ PerfTest,我可以使用单个生产者轻松获得高达 100k msgs/s(1 个通道)的速度。如果我使用发送 StreamBrige(也是 1 个通道)启动一个带有 while 循环的线程,我只会收到约 20k msgs/s 的类似设置(没有持久性,没有手动确认或确认,相同的 Docker 容器..)。我正在使用 Spring Cloud Stream 和 Rabbit Binder 3.2.2。

我的 yml 看起来像这样:

spring:
  rabbitmq:
    host: localhost
    port: 5672

  cloud:
    function:
      definition: producer1;

    stream:
      bindings:
        producer1-out-0:
          destination: messageQueue
          #requiredGroups: consumerGroup1,
      rabbit:
        bindings:
          producer1-out-0:
            producer:
              deliveryMode: NON_PERSISTENT
              exchangeType: direct
              bindingRoutingKey: default_message
              routingKeyExpression: '''default_message'''
              #maxLength: 1
      output-bindings: producer1;

我的发送循环,RabbitMQ PerfTest-Tool 是用 Java 编写的,看起来很相似:

        @Autowired
        public StreamBridge streamBridge;

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        @PostConstruct
        public void launchProducer() {
            Runnable task = () -> {
                while (true){
                    streamBridge.send("producer1-out-0", "msg");
                }
            };
            executorService.submit(task);
        }

也在我的控制台中,我收到一个奇怪的消息 Channel 'unknown.channel.name' has 1subscriber(s ) 在启动时,我不知道为什么。

使用 StreamBridge 的发送速度慢是 Spring 的自然限制还是我配置错误? 感谢您的帮助:)

I'm using a Spring Cloud StreamBridge to publish messages to a RabbitMQ exchange. With the native RabbitMQ PerfTest i easily get up to 100k msgs/s (1 channel) using a single producer. If i launch a thread with a while loop with a sending StreamBrige (also 1 channel) i'm only getting ~20k msgs/s with similar settings (no persistence, no manual acks or confirms, same Docker containers..). I'm using Spring Cloud Stream and Rabbit Binder 3.2.2.

My yml looks like this:

spring:
  rabbitmq:
    host: localhost
    port: 5672

  cloud:
    function:
      definition: producer1;

    stream:
      bindings:
        producer1-out-0:
          destination: messageQueue
          #requiredGroups: consumerGroup1,
      rabbit:
        bindings:
          producer1-out-0:
            producer:
              deliveryMode: NON_PERSISTENT
              exchangeType: direct
              bindingRoutingKey: default_message
              routingKeyExpression: '''default_message'''
              #maxLength: 1
      output-bindings: producer1;

and my sending loop, RabbitMQ PerfTest-Tool is written in Java and looks similar:

        @Autowired
        public StreamBridge streamBridge;

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        @PostConstruct
        public void launchProducer() {
            Runnable task = () -> {
                while (true){
                    streamBridge.send("producer1-out-0", "msg");
                }
            };
            executorService.submit(task);
        }

also in my console i'm getting a weird msg Channel 'unknown.channel.name' has 1 subscriber(s) at startup and i don't know why.

Is the slow sending rate using StreamBridge a natural Spring limitation or do i have something misconfigured?
Thanks for help :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

ぶ宁プ宁ぶ 2025-01-20 00:43:00

在本机 API 之上使用抽象时总会产生一些开销;然而,5x 听起来不太对劲。

我使用 -x 1 -y 1 -a 作为参数,意味着只有 1 个生产者通过自动消费者确认发布消息

这可能解释了这一点;自动确认意味着没有确认 - 当消息发送给消费者时,代理会立即确认消息(存在消息丢失的风险)。 Spring 中的等效项是Acknowledgemode.NONE;默认情况下,容器单独确认每条消息。


https://docs.spring.io/spring-amqp /docs/current/reference/html/#acknowledgeMode

https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize

https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount

Spring AMQP 设置它到默认为 250,但 SCSt 的默认值为 1,这明显较慢。

编辑

有趣; SCSt 似乎确实比 Spring Integration 单独增加了一些显着的开销。

下面测试了原生 Java 客户端的各种场景,并在上面添加了越来越多的 Spring 抽象,最后使用了 StreamBridge;也许应该对其进行分析,看看成本在哪里以及是否可以减轻。

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct

logging.level.root=warn
@SpringBootApplication
public class So71414000Application {

    public static void main(String[] args) {
        SpringApplication.run(So71414000Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
        return args -> {
            /*
             * Native java API
             */
            Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
            Channel channel = conn.createChannel();
            byte[] msg = "msg".getBytes();
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
            int count = 1000000;
            StopWatch watch = watch("native");
            IntStream.range(0, count).forEach(i -> {
                try {
                    channel.basicPublish("foo", "", props, msg);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            });
            perf(count, watch);
            channel.close();
            conn.close();
        };
    }

    @Bean
    ApplicationRunner runner2(RabbitTemplate template) {
        return args -> {
            /*
             * Single ChannelProxy, no cache, no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("nocache");
            int count = 1000000;
            template.invoke(t -> {
                IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
                return null;
            });
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner3(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("cached channel");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner4(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), conversion
             */
            StopWatch watch = watch("message conversion");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner5(RabbitTemplate template) {
        return args -> {
            /*
             * Spring Integration
             */
            AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
            outbound.setExchangeName("foo");
            outbound.setRoutingKey("");
            DirectChannel channel = new DirectChannel();
            EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
            consumer.start();
            GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
            StopWatch watch = watch("Spring Integration");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> channel.send(msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner6(StreamBridge bridge) {
        return args -> {
            /*
             * Stream bridge
             */
            StopWatch watch = watch("Stream Bridge");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
            perf(count, watch);
        };
    }


    private StopWatch watch(String name) {
        StopWatch watch = new StopWatch();
        watch.start(name);
        return watch;
    }

    private void perf(int count, StopWatch watch) {
        watch.stop();
        System.out.println(watch.prettyPrint());
        System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
    }

}

在我的 MacBook Air(2018 1.6GHz I5)和裸机代理上得到以下结果:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
10949129530  100%  native

91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
14175481691  100%  nocache

70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
16300449457  100%  cached channel

61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
18206111556  100%  message conversion

54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
26654581638  100%  Spring Integration

37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
102734493141  100%  Stream Bridge

9k/s

There will always be some overheade when using an abstraction on top of the native API; however, 5x doesn't sound right.

i'm using -x 1 -y 1 -a as arguments, means only 1 producer is publishing messages with auto consumer-acks

That probably explains it then; auto ack means no acks - the broker acks the message immediately when it is is sent to the consumer (risking message loss). The equivalent in Spring is Acknowledgemode.NONE; it's default is for the container to ack each message individually.

See
https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode

and

https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize

also

https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount

Spring AMQP sets it to 250 by default, but SCSt's default is 1, which is significantly slower.

EDIT

Interesting; SCSt does appear to add some significant overhead over Spring Integration alone.

The following tests various scenarios from the native Java client and adding more and more Spring abstractions on top, finally using StreamBridge; it should probably be profiled to see where the cost is and whether it can be mitigated.

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct

logging.level.root=warn
@SpringBootApplication
public class So71414000Application {

    public static void main(String[] args) {
        SpringApplication.run(So71414000Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
        return args -> {
            /*
             * Native java API
             */
            Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
            Channel channel = conn.createChannel();
            byte[] msg = "msg".getBytes();
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
            int count = 1000000;
            StopWatch watch = watch("native");
            IntStream.range(0, count).forEach(i -> {
                try {
                    channel.basicPublish("foo", "", props, msg);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            });
            perf(count, watch);
            channel.close();
            conn.close();
        };
    }

    @Bean
    ApplicationRunner runner2(RabbitTemplate template) {
        return args -> {
            /*
             * Single ChannelProxy, no cache, no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("nocache");
            int count = 1000000;
            template.invoke(t -> {
                IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
                return null;
            });
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner3(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("cached channel");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner4(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), conversion
             */
            StopWatch watch = watch("message conversion");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner5(RabbitTemplate template) {
        return args -> {
            /*
             * Spring Integration
             */
            AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
            outbound.setExchangeName("foo");
            outbound.setRoutingKey("");
            DirectChannel channel = new DirectChannel();
            EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
            consumer.start();
            GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
            StopWatch watch = watch("Spring Integration");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> channel.send(msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner6(StreamBridge bridge) {
        return args -> {
            /*
             * Stream bridge
             */
            StopWatch watch = watch("Stream Bridge");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
            perf(count, watch);
        };
    }


    private StopWatch watch(String name) {
        StopWatch watch = new StopWatch();
        watch.start(name);
        return watch;
    }

    private void perf(int count, StopWatch watch) {
        watch.stop();
        System.out.println(watch.prettyPrint());
        System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
    }

}

With these results on my MacBook Air (2018 1.6GHz I5) and a bare metal broker:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
10949129530  100%  native

91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
14175481691  100%  nocache

70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
16300449457  100%  cached channel

61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
18206111556  100%  message conversion

54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
26654581638  100%  Spring Integration

37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
102734493141  100%  Stream Bridge

9k/s
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文