org.apache.kafka.clients. Producer.KafkaProducer#doSend 抛出 OOM 错误 | Kafka模板
我正在使用 KafkaTemplate 向某个主题生成消息,但在 KafkaProducer 调用 doSend() 时出现 OOM 错误。不确定是否有关系。 Pod 被分配了 2Gi 的内存,我们正在批量处理 200 条消息,处理大约 10K 条消息。
下面的代码是全部出错的地方。
final ProducerRecord<String, byte[]> record =
new ProducerRecord(message.getTopicIdentifier(), message.getKey(), message.getData());
message.getMetaData().getKeyValueMap().forEach((key, value) -> record.headers().add(key, value.getBytes()));
ListenableFuture<SendResult<String, byte[]>> listenableFuture = kafkaTemplate.send(record);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, byte[]> result) {
log.debug("Message : {} published to Kafka successful", message.getId());
}
@Override
public void onFailure(Throwable ex) {
log.error("[onFailure] Error processing message: {}", message.getId(), ex);
}
});
MessageDTO 和其他相关类就像
public class Metadata {
private Map<String,String> keyValueMap;
}
public class MessageDTO {
private Long id;
private byte[] data;
private String key;
private String status;
private String reason;
private String topicIdentifier;
private Date createdDate;
private Date updateDate;
private Metadata metaData;
}
下面是生产者配置和允许 pod 使用的资源
spring:
kafka:
producer:
bootstrapServers: **server**
acks: all
compressionType: lz4
retries: 5
resources:
limits:
cpu: 1000m
memory: 2048Mi
requests:
cpu: 4m
memory: 128Mi
I am using KafkaTemplate to produce message to a topic but getting OOM Error while KafkaProducer calls doSend(). Not sure it's related.
Pod is given 2Gi of memory and we are processing around 10K messages in a batch of 200.
Below code is where it all goes wrong.
final ProducerRecord<String, byte[]> record =
new ProducerRecord(message.getTopicIdentifier(), message.getKey(), message.getData());
message.getMetaData().getKeyValueMap().forEach((key, value) -> record.headers().add(key, value.getBytes()));
ListenableFuture<SendResult<String, byte[]>> listenableFuture = kafkaTemplate.send(record);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, byte[]> result) {
log.debug("Message : {} published to Kafka successful", message.getId());
}
@Override
public void onFailure(Throwable ex) {
log.error("[onFailure] Error processing message: {}", message.getId(), ex);
}
});
MessageDTO and other related class is like
public class Metadata {
private Map<String,String> keyValueMap;
}
public class MessageDTO {
private Long id;
private byte[] data;
private String key;
private String status;
private String reason;
private String topicIdentifier;
private Date createdDate;
private Date updateDate;
private Metadata metaData;
}
Below is the producer config and resources allowed to the pod
spring:
kafka:
producer:
bootstrapServers: **server**
acks: all
compressionType: lz4
retries: 5
resources:
limits:
cpu: 1000m
memory: 2048Mi
requests:
cpu: 4m
memory: 128Mi
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您正在使用 200 个记录批次。您可以尝试将批处理大小减少到 10 甚至 1,并检查它是否停止抛出 OOM - 然后逐渐增加批处理大小,同时观察内存指标,为一个应用程序实例找到正确的批处理大小。然后您可以纵向扩展或横向扩展。
还要检查虚拟机是否配置正确并使用可用内存——这可能允许每个应用程序实例具有更大的批处理大小。
You’re using 200 record batches. You can try reducing the batch size to 10 or even 1 and check if it stops throwing OOM - then increase it gradually while watching the memory metrics to find the right batch size for one application instance. Then you can scale up or scale out.
Also check if the VM is properly configured and using the available memory - that might allow for a bigger batch size per application instance.