Kafka 事务回滚不适用于 RecordTooLargeException 的 3 个主题
我向 3 个主题发布 3 条消息 - 发布时如果遇到异常 - 所有消息都将回滚。
但就我而言,当我模拟第三个主题的以下异常时,它不会发生。 消息大小为 117440606 字节
org.apache.kafka.common.errors.RecordTooLargeException:将大消息发布到第三个主题(价格主题)时, - 我以编程方式增加要获取异常的消息的大小。
消息已成功发送到第 1 个 2 主题 - 第三个失败。 - 根据事务,所有消息都必须回滚 - 但主题 1 和 2 始终会获取消息。
但日志显示 - 事务已回滚
如何解决此问题
日志
2022-03-23 21:16:59.690 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to Item , price, Inventory -----
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":...' to topic PRICE-TOPIC:
**org.apache.kafka.common.errors.RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction
2022-03-23 21:16:59.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction**
2022-03-23 21:16:59.738 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Transaction rolled back
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.250 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to Item , price, Inventory -----
2022-03-23 21:17:00.294 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":"String","lowestPrice":"String","updatedAt":"String","createdA...' to topic PRICE-TOPIC:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.295 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction
2022-03-23 21:17:00.298 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction**
2022-03-23 21:17:00.308 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.**KafkaMessageListenerContainer - Transaction rolled back**
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is **org.springframework.kafka.KafkaException: Send failed**; nested exception is org.apache.kafka.common.errors.**RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
回滚的记录保留在日志中。
Kafka 在日志中添加一个标记来指示事务是已提交还是已回滚。
默认情况下,消费者将收到所有记录,即使它们已回滚。
消费者必须配置
isolation.level=read_commissed
以避免看到回滚的记录。https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
使用 Spring Boot 时,它是
已提交
,而不是已提交
。您的 IDE 应建议正确的值。
或者
编辑
(尽管我发现 Boot 也可以与
read_uncommissed
一起使用)。这对我来说是预期的。
EDIT2
您的应用程序正在按预期运行;当我添加
到另一个应用程序时,它没有收到任何数据。
当然,对于控制台消费者,我们会看到消息,因为控制台消费者不是
已提交读取
。当我注释掉价格发送时;我看到
EDIT3
自定义回滚后处理器;只需将其添加为
@Bean
,Boot 就会将其连接到容器工厂。但是,您应该删除
excuteInTransaction
调用并直接在模板上执行发送。这样,模板将参与容器的事务,而不是启动一个新的事务。这个例子只是记录错误;您可以添加
DeadLetterPublishingRecoverer
(或任何自定义恢复程序)。Rolled back records remain in the log.
Kafka adds a marker to the log to indicate whether the transaction was committed or rolled back.
By default, consumers will receive all records, even if they are rolled back.
Consumers must be configured with
isolation.level=read_committed
to avoid seeing rolled back records.https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
When using Spring Boot, it's
read-committed
, notread_committed
.Your IDE should suggest proper values.
Or
EDIT
(Although I see that Boot works with
read_uncommitted
too).This works as expected for me.
EDIT2
Your application is working as expected; when I add
to another application, it receives no data.
Of course, with a console consumer, we see the messages because the console consumer is not
read_committed
.And when I comment out the price send; I see
EDIT3
To customize the after rollback processor; simply add it as a
@Bean
and Boot will wire it into the container factory.However, you should remove the
excuteInTransaction
call and just do the sends directly on the template. That way, the template will participate in the container's transaction instead of starting a new one.This example just logs the error; you can add
DeadLetterPublishingRecoverer
(or any custom recoverer).