Kafka 事务回滚不适用于 RecordTooLargeException 的 3 个主题

发布于 2025-01-16 10:28:42 字数 5898 浏览 6 评论 0 原文

我向 3 个主题发布 3 条消息 - 发布时如果遇到异常 - 所有消息都将回滚。

但就我而言,当我模拟第三个主题的以下异常时,它不会发生。 消息大小为 117440606 字节

org.apache.kafka.common.errors.RecordTooLargeException:将大消息发布到第三个主题(价格主题)时, - 我以编程方式增加要获取异常的消息的大小。

消息已成功发送到第 1 个 2 主题 - 第三个失败。 - 根据事务,所有消息都必须回滚 - 但主题 1 和 2 始终会获取消息。

但日志显示 - 事务已回滚

如何解决此问题

日志

2022-03-23 21:16:59.690 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":...' to topic PRICE-TOPIC: 
**org.apache.kafka.common.errors.RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:16:59.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:16:59.738 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Transaction rolled back 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

2022-03-23 21:17:00.250 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:17:00.294 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":"String","lowestPrice":"String","updatedAt":"String","createdA...' to topic PRICE-TOPIC: 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.295 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:17:00.298 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:17:00.308 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.**KafkaMessageListenerContainer - Transaction rolled back** 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is **org.springframework.kafka.KafkaException: Send failed**; nested exception is org.apache.kafka.common.errors.**RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

在此处输入图像描述 输入图片此处描述

在此处输入图像描述 输入图片此处描述

在此处输入图像描述

在此处输入图像描述

I post 3 message to 3 topics - while posting if gets exception - all messages will be rolled back.

But in my case it is not happening when I simulate the below exception for 3rd Topic.
org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes

while posting large message to the 3rd topic (price topic)- I programmatically increase the Size of the message to get exception.

Message are send to 1st 2 topic successfully - 3rd one failed. - As per transaction all messages must be rolled back - but topic 1 and 2 all the time gets the message.

But LOG shows - Transaction rolled back

HOW to FIX this issue

Log

2022-03-23 21:16:59.690 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":...' to topic PRICE-TOPIC: 
**org.apache.kafka.common.errors.RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:16:59.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:16:59.738 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Transaction rolled back 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

2022-03-23 21:17:00.250 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:17:00.294 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":"String","lowestPrice":"String","updatedAt":"String","createdA...' to topic PRICE-TOPIC: 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.295 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:17:00.298 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:17:00.308 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.**KafkaMessageListenerContainer - Transaction rolled back** 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is **org.springframework.kafka.KafkaException: Send failed**; nested exception is org.apache.kafka.common.errors.**RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

enter image description here
enter image description here

enter image description here
enter image description here

enter image description here

enter image description here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浪推晚风 2025-01-23 10:28:42

回滚的记录保留在日志中。

Kafka 在日志中添加一个标记来指示事务是已提交还是已回滚。

默认情况下,消费者将收到所有记录,即使它们已回滚。

消费者必须配置 isolation.level=read_commissed 以避免看到回滚的记录。

https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

控制如何读取以事务方式写入的消息。如果设置为 read_commissed,则 consumer.poll() 将仅返回已提交的事务消息。如果设置为 read_uncommited(默认值),consumer.poll() 将返回所有消息,甚至是已中止的事务消息。无论哪种模式,都会无条件返回非事务性消息。

消息将始终按偏移顺序返回。因此,在 read_commited 模式下,consumer.poll() 只会返回最后一个稳定偏移量 (LSO) 之前的消息,该偏移量小于第一个稳定偏移量开放交易。特别是,在属于正在进行的交易的消息之后出现的任何消息将被保留,直到相关交易完成为止。因此,当存在正在进行的交易时,read_commissed 消费者将无法读取到高水位线。

使用 Spring Boot 时,它是已提交,而不是已提交

spring.kafka.consumer.isolation-level=read-committed

您的 IDE 应建议正确的值。

或者

spring.kafka.consumer.properties=isolation.level=read_committed

编辑

(尽管我发现 Boot 也可以与 read_uncommissed 一起使用)。

这对我来说是预期的。

@SpringBootApplication
public class So71591355Application {

    public static void main(String[] args) {
        SpringApplication.run(So71591355Application.class, args);
    }

    @KafkaListener(id = "so71591355", topics = "so71591355")
    void listen1(String in) {
        System.out.println("committed: " + in);
    }

    @KafkaListener(id = "so71591355-2", topics = "so71591355",
            properties = "isolation.level:read_uncommitted")
    void listen2(String in) {
        System.out.println("uncommitted: " + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71591355").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        template.setAllowNonTransactional(true);
        return args -> {
            template.send("so71591355", "non-transactional");
            try {
                template.executeInTransaction(t -> {
                    t.send("so71591355", "first");
                    t.send("so71591355", "second");
                    t.send("so71591355", new String(new byte[2000000]));
                    return null;
                });
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        };
    }
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed

spring.kafka.producer.transaction-id-prefix=tx-

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:660)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403)
    at com.example.demo.So71591355Application.lambda$1(So71591355Application.java:49)
    at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:507)
    at com.example.demo.So71591355Application.lambda$0(So71591355Application.java:44)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
    at com.example.demo.So71591355Application.main(So71591355Application.java:19)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
uncommitted: non-transactional
committed: non-transactional
uncommitted: first
uncommitted: second

EDIT2

您的应用程序正在按预期运行;当我添加

@KafkaListener(id = "otherApp", topics =  { "ITEM-TOPIC", "INVENTORY-TOPIC", "PRICE-TOPIC" })
void listen3(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    System.out.println("so71591355 from " + topic + ": " + in);
}

到另一个应用程序时,它没有收到任何数据。

2022-03-24 10:04:57.939 INFO 15038 --- [ hisApp-0-C-1] osklKafkaMessageListenerContainer : otherApp: 分配的分区: [PRICE-TOPIC-0, ITEM-TOPIC-0, INVENTORY-TOPIC -0]

当然,对于控制台消费者,我们会看到消息,因为控制台消费者不是已提交读取

当我注释掉价格发送时;我看到

so71591355 from INVENTORY-TOPIC: Inventory data : My test Message
so71591355 from ITEM-TOPIC: Item data : My test Message
...

EDIT3

自定义回滚后处理器;只需将其添加为 @Bean,Boot 就会将其连接到容器工厂。

@Bean
AfterRollbackProcessor<Object, Object> arp() {
    return new DefaultAfterRollbackProcessor<>((rec, ex) -> {
        log.error("Failed to process {} from topic, partition {}-{}, @{}",
                rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
    }, new FixedBackOff(3000L, 2));
}

但是,您应该删除 excuteInTransaction 调用并直接在模板上执行发送。这样,模板将参与容器的事务,而不是启动一个新的事务。

这个例子只是记录错误;您可以添加DeadLetterPublishingRecoverer(或任何自定义恢复程序)。

Rolled back records remain in the log.

Kafka adds a marker to the log to indicate whether the transaction was committed or rolled back.

By default, consumers will receive all records, even if they are rolled back.

Consumers must be configured with isolation.level=read_committed to avoid seeing rolled back records.

https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

Controls how to read messages written transactionally. If set to read_committed, consumer.poll() will only return transactional messages which have been committed. If set to read_uncommitted (the default), consumer.poll() will return all messages, even transactional messages which have been aborted. Non-transactional messages will be returned unconditionally in either mode.

Messages will always be returned in offset order. Hence, in read_committed mode, consumer.poll() will only return messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction. In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a result, read_committed consumers will not be able to read up to the high watermark when there are in flight transactions.

When using Spring Boot, it's read-committed, not read_committed.

spring.kafka.consumer.isolation-level=read-committed

Your IDE should suggest proper values.

Or

spring.kafka.consumer.properties=isolation.level=read_committed

EDIT

(Although I see that Boot works with read_uncommitted too).

This works as expected for me.

@SpringBootApplication
public class So71591355Application {

    public static void main(String[] args) {
        SpringApplication.run(So71591355Application.class, args);
    }

    @KafkaListener(id = "so71591355", topics = "so71591355")
    void listen1(String in) {
        System.out.println("committed: " + in);
    }

    @KafkaListener(id = "so71591355-2", topics = "so71591355",
            properties = "isolation.level:read_uncommitted")
    void listen2(String in) {
        System.out.println("uncommitted: " + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71591355").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        template.setAllowNonTransactional(true);
        return args -> {
            template.send("so71591355", "non-transactional");
            try {
                template.executeInTransaction(t -> {
                    t.send("so71591355", "first");
                    t.send("so71591355", "second");
                    t.send("so71591355", new String(new byte[2000000]));
                    return null;
                });
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        };
    }
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed

spring.kafka.producer.transaction-id-prefix=tx-

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:660)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403)
    at com.example.demo.So71591355Application.lambda$1(So71591355Application.java:49)
    at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:507)
    at com.example.demo.So71591355Application.lambda$0(So71591355Application.java:44)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
    at com.example.demo.So71591355Application.main(So71591355Application.java:19)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
uncommitted: non-transactional
committed: non-transactional
uncommitted: first
uncommitted: second

EDIT2

Your application is working as expected; when I add

@KafkaListener(id = "otherApp", topics =  { "ITEM-TOPIC", "INVENTORY-TOPIC", "PRICE-TOPIC" })
void listen3(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    System.out.println("so71591355 from " + topic + ": " + in);
}

to another application, it receives no data.

2022-03-24 10:04:57.939 INFO 15038 --- [ hisApp-0-C-1] o.s.k.l.KafkaMessageListenerContainer : otherApp: partitions assigned: [PRICE-TOPIC-0, ITEM-TOPIC-0, INVENTORY-TOPIC-0]

Of course, with a console consumer, we see the messages because the console consumer is not read_committed.

And when I comment out the price send; I see

so71591355 from INVENTORY-TOPIC: Inventory data : My test Message
so71591355 from ITEM-TOPIC: Item data : My test Message
...

EDIT3

To customize the after rollback processor; simply add it as a @Bean and Boot will wire it into the container factory.

@Bean
AfterRollbackProcessor<Object, Object> arp() {
    return new DefaultAfterRollbackProcessor<>((rec, ex) -> {
        log.error("Failed to process {} from topic, partition {}-{}, @{}",
                rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
    }, new FixedBackOff(3000L, 2));
}

However, you should remove the excuteInTransaction call and just do the sends directly on the template. That way, the template will participate in the container's transaction instead of starting a new one.

This example just logs the error; you can add DeadLetterPublishingRecoverer (or any custom recoverer).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文