由 Spring Integration 触发时,Spring Batch 失败并出现事务错误
我有一个简单的 Spring Boot (2.6.4) 应用程序,它使用 Spring Batch 和 Spring Integration 从文件读取数据并发出 Kafka 事件。我已将 Spring Integration 配置为轮询可配置目录,并在新文件到达时立即触发 Spring Batch 作业。
一旦作业被“踢”,我就会收到此错误:
在 JobRepository 中检测到现有事务。请修复此问题并重试(例如,从客户端删除 @Transactional 注释)。
我假设这是由以下 Spring Integration 配置触发的:
return IntegrationFlows.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(new PseudoTransactionManager())))
TransactionSynchronizationFactory
配置为将传入文件移动到错误或成功目录,基于作业的结果。 我的理解是,Spring Batch Job 不喜欢在 Transaction 的上下文中执行,因为它需要管理自己的事务来处理多个步骤失败。
我还知道我可以将 JobRepositoryFactoryBean 的 setValidateTransactionState 值设置为 false,但我不想弄乱 Spring Batch 内部结构。
正如其他地方所建议的,我也尝试过使用异步作业启动器,但没有成功。
@Bean
public JobLauncher getJobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.setJobRepository(this.jobRepository);
return jobLauncher;
}
该应用程序正在使用内存中的 H2 数据源,但这似乎不会影响错误。
我无法找到解决这个问题的方法,有什么建议吗?
编辑 - 完整项目在这里: https://github.com/luciano- fiandesio/spring-batch-and-integration-demo
I have a simple Spring Boot (2.6.4) app that uses Spring Batch and Spring Integration to read data from a file and emit Kafka events. I have configured Spring Integration to poll a configurable directory and trigger the Spring Batch Job as soon as a new file arrives.
As soon as the Job is "kicked" I get this error:
Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
I assume this is triggered by the following Spring Integration configuration:
return IntegrationFlows.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(new PseudoTransactionManager())))
The TransactionSynchronizationFactory
is configured to move the incoming file into an error or success directory, based on the outcome of the job.
My understanding is that a Spring Batch Job does not like to be executed within the context of a Transaction, since it needs to manage its own transaction to deal with multiple steps failures.
I also understand that I could set the value of setValidateTransactionState
of JobRepositoryFactoryBean
to false
, but I would rather not mess around with the Spring Batch internals.
As suggested elsewhere, I have also tried to use an Async Job Launcher, but no luck.
@Bean
public JobLauncher getJobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.setJobRepository(this.jobRepository);
return jobLauncher;
}
The app is using an in-memory H2 data-source, but that doesn't seem to affect the error.
I'm unable to find a solution to this problem, any recommendation?
Edit - full project here: https://github.com/luciano-fiandesio/spring-batch-and-integration-demo
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论