org.apache.kafka.clients. Producer.KafkaProducer#doSend 抛出 OOM 错误 | Kafka模板

发布于 2025-01-12 13:13:52 字数 1649 浏览 1 评论 0原文

我正在使用 KafkaTemplate 向某个主题生成消息,但在 KafkaProducer 调用 doSend() 时出现 OOM 错误。不确定是否有关系。 Pod 被分配了 2Gi 的内存,我们正在批量处理 200 条消息,处理大约 10K 条消息。

下面的代码是全部出错的地方。

final ProducerRecord<String, byte[]> record =
    new ProducerRecord(message.getTopicIdentifier(), message.getKey(), message.getData());

message.getMetaData().getKeyValueMap().forEach((key, value) -> record.headers().add(key, value.getBytes()));
ListenableFuture<SendResult<String, byte[]>> listenableFuture =  kafkaTemplate.send(record);
listenableFuture.addCallback(new ListenableFutureCallback<>() {

    @Override
    public void onSuccess(SendResult<String, byte[]> result) {
        log.debug("Message : {} published to Kafka successful", message.getId());
    }
    @Override
    public void onFailure(Throwable ex) {
        log.error("[onFailure] Error processing message: {}", message.getId(), ex);
    }
});

MessageDTO 和其他相关类就像

public class Metadata {

    private Map<String,String> keyValueMap;
}

public class MessageDTO {

    private Long id;
    private byte[] data;
    private String key;
    private String status;
    private String reason;
    private String topicIdentifier;
    private Date createdDate;
    private Date updateDate;
    private Metadata metaData;
}

下面是生产者配置和允许 pod 使用的资源

spring:
    kafka:
      producer:
        bootstrapServers: **server**
        acks: all
        compressionType: lz4
        retries: 5

  resources:
    limits:
      cpu: 1000m
      memory: 2048Mi
    requests:
      cpu: 4m
      memory: 128Mi

I am using KafkaTemplate to produce message to a topic but getting OOM Error while KafkaProducer calls doSend(). Not sure it's related.
Pod is given 2Gi of memory and we are processing around 10K messages in a batch of 200.

Below code is where it all goes wrong.

final ProducerRecord<String, byte[]> record =
    new ProducerRecord(message.getTopicIdentifier(), message.getKey(), message.getData());

message.getMetaData().getKeyValueMap().forEach((key, value) -> record.headers().add(key, value.getBytes()));
ListenableFuture<SendResult<String, byte[]>> listenableFuture =  kafkaTemplate.send(record);
listenableFuture.addCallback(new ListenableFutureCallback<>() {

    @Override
    public void onSuccess(SendResult<String, byte[]> result) {
        log.debug("Message : {} published to Kafka successful", message.getId());
    }
    @Override
    public void onFailure(Throwable ex) {
        log.error("[onFailure] Error processing message: {}", message.getId(), ex);
    }
});

MessageDTO and other related class is like

public class Metadata {

    private Map<String,String> keyValueMap;
}

public class MessageDTO {

    private Long id;
    private byte[] data;
    private String key;
    private String status;
    private String reason;
    private String topicIdentifier;
    private Date createdDate;
    private Date updateDate;
    private Metadata metaData;
}

Below is the producer config and resources allowed to the pod

spring:
    kafka:
      producer:
        bootstrapServers: **server**
        acks: all
        compressionType: lz4
        retries: 5

  resources:
    limits:
      cpu: 1000m
      memory: 2048Mi
    requests:
      cpu: 4m
      memory: 128Mi

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

阳光下的泡沫是彩色的 2025-01-19 13:13:52

您正在使用 200 个记录批次。您可以尝试将批处理大小减少到 10 甚至 1,并检查它是否停止抛出 OOM - 然后逐渐增加批处理大小,同时观察内存指标,为一个应用程序实例找到正确的批处理大小。然后您可以纵向扩展或横向扩展。

还要检查虚拟机是否配置正确并使用可用内存——这可能允许每个应用程序实例具有更大的批处理大小。

You’re using 200 record batches. You can try reducing the batch size to 10 or even 1 and check if it stops throwing OOM - then increase it gradually while watching the memory metrics to find the right batch size for one application instance. Then you can scale up or scale out.

Also check if the VM is properly configured and using the available memory - that might allow for a bigger batch size per application instance.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文