Sleuth 3.0.3 无法在 kafka 日志中与 Kafka 配合使用

发布于 2025-01-09 17:38:08 字数 3586 浏览 0 评论 0原文

我是侦探新手。

我添加了侦探依赖项,如下所示。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>

并对正常日志的日志模式进行了相应的更改,

<PatternLayout pattern="%d{HH:mm:ss.SSS} [%tid] [%t] [%X{traceId},%X{spanId}] %-5level %logger{36} - %msg%n"/>

当数据发送到kafka时,我得到的traceId和spanId如下

18:54:28.876 [42] [http-nio-8082-exec-2] [04287c5af43cc1d2,04287c5af43cc1d2] INFO  - ajax event received

所示。

18:54:30.763 [72] [kafka-producer-network-thread | producer-1] [,] INFO  - event successfully send to kafka topic at time 1645709070763

kafka生产者如下图所示。

@Component
@AllArgsConstructor
public class KafkaProducer {

    private static final Logger log = LogManager.getLogger(KafkaProducer.class);

    private final ProducerConfiguration producerConfiguration;

    public Future<RecordMetadata> produceEvents(GenericRecord genericRecord, String topic, String sessionId) {

        Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfiguration.getProducerProperties());

        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(
                topic, sessionId, genericRecord
        );

        Future<RecordMetadata> data = producer.send(producerRecord,
                (RecordMetadata metadata, Exception exception) ->{
                if (exception == null) {
                    log.info("event successfully send to kafka topic at time {}", System.currentTimeMillis());
                } else {
                    log.info(exception.getMessage());
                }
            }
        );

        producer.flush();
        producer.close();
        return data;
    }
}

生产者配置。

@Component
@Getter
@Setter
public class ProducerConfiguration {

    private final Properties producerProperties;

    @Value("${kafka.bootstrapserver}")
    private String bootstrapServer;
    @Value("${kafka.acks}")
    private String acks;
    @Value("${kafka.retries}")
    private String retries;
    @Value("${kafka.max.in.flight.requests.per.connection}")
    private String maxInFlightRequestsPerConnection;
    @Value("${kafka.enable.idempotence}")
    private String enableIdempotence;

    public ProducerConfiguration(Properties producerProperties){
        this.producerProperties = producerProperties;
    }

    @Bean
    private void setProperties(){
        this.producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        
        this.producerProperties.setProperty(ProducerConfig.ACKS_CONFIG, acks);
        this.producerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, retries);
        this.producerProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
        this.producerProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        this.producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
        this.producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer .class.getName());
        this.producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));

    }
}

我无法弄清楚为什么会发生这种情况。

Iam new to sleuth.

I added the sleuth dependecny as follows.

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>

and made the corresponding changes in log pattern

<PatternLayout pattern="%d{HH:mm:ss.SSS} [%tid] [%t] [%X{traceId},%X{spanId}] %-5level %logger{36} - %msg%n"/>

for normal logs I get the traceId and spanId as below

18:54:28.876 [42] [http-nio-8082-exec-2] [04287c5af43cc1d2,04287c5af43cc1d2] INFO  - ajax event received

when the data is send to kafka it is shown as below.

18:54:30.763 [72] [kafka-producer-network-thread | producer-1] [,] INFO  - event successfully send to kafka topic at time 1645709070763

The kafka producer is as shown below.

@Component
@AllArgsConstructor
public class KafkaProducer {

    private static final Logger log = LogManager.getLogger(KafkaProducer.class);

    private final ProducerConfiguration producerConfiguration;

    public Future<RecordMetadata> produceEvents(GenericRecord genericRecord, String topic, String sessionId) {

        Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfiguration.getProducerProperties());

        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(
                topic, sessionId, genericRecord
        );

        Future<RecordMetadata> data = producer.send(producerRecord,
                (RecordMetadata metadata, Exception exception) ->{
                if (exception == null) {
                    log.info("event successfully send to kafka topic at time {}", System.currentTimeMillis());
                } else {
                    log.info(exception.getMessage());
                }
            }
        );

        producer.flush();
        producer.close();
        return data;
    }
}

The producer configuration.

@Component
@Getter
@Setter
public class ProducerConfiguration {

    private final Properties producerProperties;

    @Value("${kafka.bootstrapserver}")
    private String bootstrapServer;
    @Value("${kafka.acks}")
    private String acks;
    @Value("${kafka.retries}")
    private String retries;
    @Value("${kafka.max.in.flight.requests.per.connection}")
    private String maxInFlightRequestsPerConnection;
    @Value("${kafka.enable.idempotence}")
    private String enableIdempotence;

    public ProducerConfiguration(Properties producerProperties){
        this.producerProperties = producerProperties;
    }

    @Bean
    private void setProperties(){
        this.producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        
        this.producerProperties.setProperty(ProducerConfig.ACKS_CONFIG, acks);
        this.producerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, retries);
        this.producerProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
        this.producerProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        this.producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
        this.producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer .class.getName());
        this.producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));

    }
}

Iam not able to figure out why this is happening.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文