如何用度量来计算IDEMTOTENT消费者的重复消息?

发布于 2025-01-20 17:36:27 字数 1346 浏览 3 评论 0原文

我有一个Apache Camel应用程序,带有 。我需要一个具有重复消息总数的度量。我该如何实施这样的指标?

代码

@SpringBootApplication
public class TestApplication {

  public static void main(String[] args) {
    SpringApplication.run(TestApplication.class, args);
  }

  @Bean
  public MicrometerRoutePolicyFactory micrometerRoutePolicyFactory() {
    return new MicrometerRoutePolicyFactory();
  }

  @Bean
  public EndpointRouteBuilder route() {
    return new EndpointRouteBuilder() {
      @Override
      public void configure() throws Exception {
          from(file("d:/tmp/camel/"))
             .idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
             .to(file("d:/tmp/copy/"));
      }
    };
  }
}

研究

我研究了 micrometerconstants ,但是我找不到重复的指标消息。

问题

如何用度量计算愿意消费者的重复消息数量?

I have an Apache Camel application with an Idempotent Consumer. I need a metric with the total number of duplicated messages. How could I implement such a metric?

Code

@SpringBootApplication
public class TestApplication {

  public static void main(String[] args) {
    SpringApplication.run(TestApplication.class, args);
  }

  @Bean
  public MicrometerRoutePolicyFactory micrometerRoutePolicyFactory() {
    return new MicrometerRoutePolicyFactory();
  }

  @Bean
  public EndpointRouteBuilder route() {
    return new EndpointRouteBuilder() {
      @Override
      public void configure() throws Exception {
          from(file("d:/tmp/camel/"))
             .idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
             .to(file("d:/tmp/copy/"));
      }
    };
  }
}

Research

I looked into MicrometerConstants, but I couldn't find a metric for duplicate messages.

Question

How can I count the number of duplicate messages for Idempotent Consumer with a metric?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

音盲 2025-01-27 17:36:27

我找到了解决方法,请参阅
幂等消费者

如何处理路由中的重复消息
从 Camel 2.8 开始提供

您现在可以将 skipDuplicate 选项设置为 false,这会指示幂等使用者也路由重复消息。但是,通过将 Exchange 上的属性设置为 true,重复邮件已被标记为重复。我们可以通过使用基于内容的路由器或消息过滤器来检测这一点并处理重复的消息,从而利用这一事实。

例如,在下面的示例中,我们使用消息过滤器将消息发送到重复端点,然后停止继续路由该消息。

过滤重复消息

from("direct:start")
    // 指示幂等消费者不要跳过重复项,因为我们将过滤我们自己
    .idempotConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
    .filter(属性(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
        // 通过将重复消息发送到其他地方来过滤掉它们,然后停止
        .to("模拟:重复")
        。停止()
    。结尾()
    // 这里我们只处理新消息(没有重复消息)
    .to("模拟:结果");

MicrometerBuilders#micrometer

默认 MicrometerEndpointBuilderFactory.MicrometerEndpointBuilder 千分尺(字符串路径)

Micrometer (camel-micrometer) 使用 Micrometer 库直接从 Camel 路线收集各种指标。类别:监控 自:2.22 Maven 坐标:org.apache.camel:camel-micrometer 语法:micrometer:metricsType:metricsName 路径参数:metricsType(必填) 指标类型 有 6 个枚举,取值可以可以是以下之一:COUNTER、GAUGE、LONG_TASK_TIMER、TIMER、DISTRIBUTION_SUMMARY、OTHER 路径参数:metricsName(必需) 的名称metrics 路径参数:tags 指标的标签

参数:
路径 - 指标类型:指标名称

退货:
DSL 构建器

我修改的代码:

@Bean
public EndpointRouteBuilder route() {
  return new EndpointRouteBuilder() {
    @Override
    public void configure() throws Exception {
       from(file("d:/tmp/camel/"))
         .idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
           .skipDuplicate(false)
         .filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
           .to(micrometer("duplicate_messages").increment("1"))
           .stop()
         .end()
         .to(file("d:/tmp/copy/"));
    }
  };
}

I found a workaround, see
Idempotent Consumer
:

How to handle duplicate messages in the route
Available as of Camel 2.8

You can now set the skipDuplicate option to false which instructs the idempotent consumer to route duplicate messages as well. However the duplicate message has been marked as duplicate by having a property on the Exchange set to true. We can leverage this fact by using a Content Based Router or Message Filter to detect this and handle duplicate messages.

For example in the following example we use the Message Filter to send the message to a duplicate endpoint, and then stop continue routing that message.

Filter duplicate messages

from("direct:start")
    // instruct idempotent consumer to not skip duplicates as we will filter then our self
    .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
    .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
        // filter out duplicate messages by sending them to someplace else and then stop
        .to("mock:duplicate")
        .stop()
    .end()
    // and here we process only new messages (no duplicates)
    .to("mock:result");

and MicrometerBuilders#micrometer:

default MicrometerEndpointBuilderFactory.MicrometerEndpointBuilder micrometer(String path)

Micrometer (camel-micrometer) Collect various metrics directly from Camel routes using the Micrometer library. Category: monitoring Since: 2.22 Maven coordinates: org.apache.camel:camel-micrometer Syntax: micrometer:metricsType:metricsName Path parameter: metricsType (required) Type of metrics There are 6 enums and the value can be one of: COUNTER, GAUGE, LONG_TASK_TIMER, TIMER, DISTRIBUTION_SUMMARY, OTHER Path parameter: metricsName (required) Name of metrics Path parameter: tags Tags of metrics

Parameters:
path - metricsType:metricsName

Returns:
the dsl builder

My modified code:

@Bean
public EndpointRouteBuilder route() {
  return new EndpointRouteBuilder() {
    @Override
    public void configure() throws Exception {
       from(file("d:/tmp/camel/"))
         .idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
           .skipDuplicate(false)
         .filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
           .to(micrometer("duplicate_messages").increment("1"))
           .stop()
         .end()
         .to(file("d:/tmp/copy/"));
    }
  };
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文