Sleuth 3.0.3 无法在 kafka 日志中与 Kafka 配合使用
我是侦探新手。
我添加了侦探依赖项,如下所示。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
并对正常日志的日志模式进行了相应的更改,
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%tid] [%t] [%X{traceId},%X{spanId}] %-5level %logger{36} - %msg%n"/>
当数据发送到kafka时,我得到的traceId和spanId如下
18:54:28.876 [42] [http-nio-8082-exec-2] [04287c5af43cc1d2,04287c5af43cc1d2] INFO - ajax event received
所示。
18:54:30.763 [72] [kafka-producer-network-thread | producer-1] [,] INFO - event successfully send to kafka topic at time 1645709070763
kafka生产者如下图所示。
@Component
@AllArgsConstructor
public class KafkaProducer {
private static final Logger log = LogManager.getLogger(KafkaProducer.class);
private final ProducerConfiguration producerConfiguration;
public Future<RecordMetadata> produceEvents(GenericRecord genericRecord, String topic, String sessionId) {
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfiguration.getProducerProperties());
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(
topic, sessionId, genericRecord
);
Future<RecordMetadata> data = producer.send(producerRecord,
(RecordMetadata metadata, Exception exception) ->{
if (exception == null) {
log.info("event successfully send to kafka topic at time {}", System.currentTimeMillis());
} else {
log.info(exception.getMessage());
}
}
);
producer.flush();
producer.close();
return data;
}
}
生产者配置。
@Component
@Getter
@Setter
public class ProducerConfiguration {
private final Properties producerProperties;
@Value("${kafka.bootstrapserver}")
private String bootstrapServer;
@Value("${kafka.acks}")
private String acks;
@Value("${kafka.retries}")
private String retries;
@Value("${kafka.max.in.flight.requests.per.connection}")
private String maxInFlightRequestsPerConnection;
@Value("${kafka.enable.idempotence}")
private String enableIdempotence;
public ProducerConfiguration(Properties producerProperties){
this.producerProperties = producerProperties;
}
@Bean
private void setProperties(){
this.producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
this.producerProperties.setProperty(ProducerConfig.ACKS_CONFIG, acks);
this.producerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, retries);
this.producerProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
this.producerProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
this.producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
this.producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer .class.getName());
this.producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));
}
}
我无法弄清楚为什么会发生这种情况。
Iam new to sleuth.
I added the sleuth dependecny as follows.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
and made the corresponding changes in log pattern
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%tid] [%t] [%X{traceId},%X{spanId}] %-5level %logger{36} - %msg%n"/>
for normal logs I get the traceId and spanId as below
18:54:28.876 [42] [http-nio-8082-exec-2] [04287c5af43cc1d2,04287c5af43cc1d2] INFO - ajax event received
when the data is send to kafka it is shown as below.
18:54:30.763 [72] [kafka-producer-network-thread | producer-1] [,] INFO - event successfully send to kafka topic at time 1645709070763
The kafka producer is as shown below.
@Component
@AllArgsConstructor
public class KafkaProducer {
private static final Logger log = LogManager.getLogger(KafkaProducer.class);
private final ProducerConfiguration producerConfiguration;
public Future<RecordMetadata> produceEvents(GenericRecord genericRecord, String topic, String sessionId) {
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfiguration.getProducerProperties());
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(
topic, sessionId, genericRecord
);
Future<RecordMetadata> data = producer.send(producerRecord,
(RecordMetadata metadata, Exception exception) ->{
if (exception == null) {
log.info("event successfully send to kafka topic at time {}", System.currentTimeMillis());
} else {
log.info(exception.getMessage());
}
}
);
producer.flush();
producer.close();
return data;
}
}
The producer configuration.
@Component
@Getter
@Setter
public class ProducerConfiguration {
private final Properties producerProperties;
@Value("${kafka.bootstrapserver}")
private String bootstrapServer;
@Value("${kafka.acks}")
private String acks;
@Value("${kafka.retries}")
private String retries;
@Value("${kafka.max.in.flight.requests.per.connection}")
private String maxInFlightRequestsPerConnection;
@Value("${kafka.enable.idempotence}")
private String enableIdempotence;
public ProducerConfiguration(Properties producerProperties){
this.producerProperties = producerProperties;
}
@Bean
private void setProperties(){
this.producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
this.producerProperties.setProperty(ProducerConfig.ACKS_CONFIG, acks);
this.producerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, retries);
this.producerProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
this.producerProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
this.producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
this.producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer .class.getName());
this.producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));
}
}
Iam not able to figure out why this is happening.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论