由 Spring Integration 触发时,Spring Batch 失败并出现事务错误

发布于 2025-01-15 18:45:02 字数 1519 浏览 4 评论 0原文

我有一个简单的 Spring Boot (2.6.4) 应用程序,它使用 Spring Batch 和 Spring Integration 从文件读取数据并发出 Kafka 事件。我已将 Spring Integration 配置为轮询可配置目录,并在新文件到达时立即触发 Spring Batch 作业。

一旦作业被“踢”,我就会收到此错误:

在 JobRepository 中检测到现有事务。请修复此问题并重试(例如,从客户端删除 @Transactional 注释)。

我假设这是由以下 Spring Integration 配置触发的:

 return IntegrationFlows.from(fileReadingMessageSource,
    c -> c.poller(Pollers.fixedDelay(period)
          .taskExecutor(taskExecutor)
          .maxMessagesPerPoll(maxMessagesPerPoll)
          .transactionSynchronizationFactory(transactionSynchronizationFactory())
          .transactional(new PseudoTransactionManager())))

TransactionSynchronizationFactory 配置为将传入文件移动到错误或成功目录,基于作业的结果。 我的理解是,Spring Batch Job 不喜欢在 Transaction 的上下文中执行,因为它需要管理自己的事务来处理多个步骤失败。

我还知道我可以将 JobRepositoryFactoryBean 的 setValidateTransactionState 值设置为 false,但我不想弄乱 Spring Batch 内部结构。

正如其他地方所建议的,我也尝试过使用异步作业启动器,但没有成功。

@Bean
public JobLauncher getJobLauncher() {
   SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
   jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
   jobLauncher.setJobRepository(this.jobRepository);
   return jobLauncher;
}

该应用程序正在使用内存中的 H2 数据源,但这似乎不会影响错误。

我无法找到解决这个问题的方法,有什么建议吗?

编辑 - 完整项目在这里: https://github.com/luciano- fiandesio/spring-batch-and-integration-demo

I have a simple Spring Boot (2.6.4) app that uses Spring Batch and Spring Integration to read data from a file and emit Kafka events. I have configured Spring Integration to poll a configurable directory and trigger the Spring Batch Job as soon as a new file arrives.

As soon as the Job is "kicked" I get this error:

Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).

I assume this is triggered by the following Spring Integration configuration:

 return IntegrationFlows.from(fileReadingMessageSource,
    c -> c.poller(Pollers.fixedDelay(period)
          .taskExecutor(taskExecutor)
          .maxMessagesPerPoll(maxMessagesPerPoll)
          .transactionSynchronizationFactory(transactionSynchronizationFactory())
          .transactional(new PseudoTransactionManager())))

The TransactionSynchronizationFactory is configured to move the incoming file into an error or success directory, based on the outcome of the job.
My understanding is that a Spring Batch Job does not like to be executed within the context of a Transaction, since it needs to manage its own transaction to deal with multiple steps failures.

I also understand that I could set the value of setValidateTransactionState of JobRepositoryFactoryBean to false, but I would rather not mess around with the Spring Batch internals.

As suggested elsewhere, I have also tried to use an Async Job Launcher, but no luck.

@Bean
public JobLauncher getJobLauncher() {
   SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
   jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
   jobLauncher.setJobRepository(this.jobRepository);
   return jobLauncher;
}

The app is using an in-memory H2 data-source, but that doesn't seem to affect the error.

I'm unable to find a solution to this problem, any recommendation?

Edit - full project here: https://github.com/luciano-fiandesio/spring-batch-and-integration-demo

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文