如何配置Azure Service Bus以批量发送消息?

发布于 2025-02-10 12:26:47 字数 1416 浏览 1 评论 0 原文

目的是消耗从Kafka并重定向到Azure Service Bus的记录。每次调查Kafka主题时,可以在调试时配置Kafka消费者并确认数千条消息。

另一方面,Azure Service Bus将始终发布一条消息。因此,当此集成运行时,Kafka日志将表明收到了数千条消息,然后Azure Service Bus Logs将迭代每次发送到队列的每个消息。在迭代中,这需要几分钟的时间,从而大大降低了过程。

组件文档批次是默认值,但在如何实现这一目标上还不清楚。

public class SampleKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka Server -> Log ");
    from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
             + "&maxPollRecords={{consumer.maxPollRecords}}"
             + "&consumersCount={{consumer.consumersCount}}"
             + "&seekTo={{consumer.seekTo}}"
             + "&groupId={{consumer.group}}"
             + "&lingerMs={{consumer.lingerMs}}"
             + "&producerBatchSize={{consumer.producerBatchSize}}"
             + "&saslJaasConfig={{consumer.saslJaasConfig}}"
             + "&saslMechanism={{consumer.saslMechanism}}"
             + "&securityProtocol={{consumer.securityProtocol}}")
             .routeId("Kafka")
             .to("azure-servicebus:topic?connectionString={{producer.connectionString}}&producerOperation=sendMessages");
             
  }
}

关于如何处理这个的见解吗?

The objective is to consume records from kafka and redirect to the Azure Service Bus. The kafka consumer can be configured, and confirmed while debugging, to consume thousands of messages each time it polls the kafka topic.

The Azure Service Bus, on the other hand, will always publish a single message. So when this integration runs, the kafka logs will show that thousands of messages were received, then the Azure Service Bus logs will iterate over each one sending one at a time to the queue. This takes several minutes per iteration slowing the process down significantly.

The component documentation writes that a setting to send in batches is the default, but is less than clear on precisely how to achieve that.

public class SampleKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka Server -> Log ");
    from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
             + "&maxPollRecords={{consumer.maxPollRecords}}"
             + "&consumersCount={{consumer.consumersCount}}"
             + "&seekTo={{consumer.seekTo}}"
             + "&groupId={{consumer.group}}"
             + "&lingerMs={{consumer.lingerMs}}"
             + "&producerBatchSize={{consumer.producerBatchSize}}"
             + "&saslJaasConfig={{consumer.saslJaasConfig}}"
             + "&saslMechanism={{consumer.saslMechanism}}"
             + "&securityProtocol={{consumer.securityProtocol}}")
             .routeId("Kafka")
             .to("azure-servicebus:topic?connectionString={{producer.connectionString}}&producerOperation=sendMessages");
             
  }
}

Any insight on how to approach this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

鯉魚旗 2025-02-17 12:26:47

Azure Service Bus文档试图暗示的正确解决方案是将结果作为对象或消息列表批量,并将其传递到Azure生产者中。

在apache骆驼中这样做的关键是使用。这是在下面的代码中具有完成间隔的夫妇,允许管道将消息串在一起并分批发送到服务总线。

public class SampleKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka Server -> Log ");
    from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
             + "&maxPollRecords={{consumer.maxPollRecords}}"
             + "&consumersCount={{consumer.consumersCount}}"
             + "&seekTo={{consumer.seekTo}}"
             + "&groupId={{consumer.group}}"
             + "&lingerMs={{consumer.lingerMs}}"
             + "&producerBatchSize={{consumer.producerBatchSize}}"
             + "&saslJaasConfig={{consumer.saslJaasConfig}}"
             + "&saslMechanism={{consumer.saslMechanism}}"
             + "&securityProtocol={{consumer.securityProtocol}}")
             .routeId("Kafka")
             .aggregate(new AzureAggregationStrategy()).constant(true)
             .completionInterval(300L)
             .to("azure-servicebus:topic?connectionString={{producer.connectionString}}&producerOperation=sendMessages");
             
  }
}

The right solution that the Azure Service Bus docs were attempting to allude to is to batch up the results as a list of objects or messages and pass that into the Azure producer.

The key to do this in Apache Camel is to use an Aggregator. This, couple with a completion interval in the code below allows the pipe to bunch up messages together and send to the Service Bus in batches.

public class SampleKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka Server -> Log ");
    from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
             + "&maxPollRecords={{consumer.maxPollRecords}}"
             + "&consumersCount={{consumer.consumersCount}}"
             + "&seekTo={{consumer.seekTo}}"
             + "&groupId={{consumer.group}}"
             + "&lingerMs={{consumer.lingerMs}}"
             + "&producerBatchSize={{consumer.producerBatchSize}}"
             + "&saslJaasConfig={{consumer.saslJaasConfig}}"
             + "&saslMechanism={{consumer.saslMechanism}}"
             + "&securityProtocol={{consumer.securityProtocol}}")
             .routeId("Kafka")
             .aggregate(new AzureAggregationStrategy()).constant(true)
             .completionInterval(300L)
             .to("azure-servicebus:topic?connectionString={{producer.connectionString}}&producerOperation=sendMessages");
             
  }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文