春季批处理重新启动工作失败,交易异常
我正在使用重新启动
功能测试我的春季批处理作业,我正在处理块尺寸5,
的九个记录,在处理第一个块 - 我故意失败了测试故障情况的第二个大块。不出所料,在成功处理的第一个块处理后,我的批处理失败了 - batch_job_execution
我的记录具有执行ID和状态为 。现在,我正在通过传递执行ID来运行重新启动作业,以验证失败的记录是否正在处理。但是,当我运行失败的工作时,我会得到以下例外:
2022-05-03 18:58:44,829 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [RestartJobTasklet]
Exception java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).while restart the failed job executionId8
您能在这里为我提供帮助 - 我在这里缺少什么。请在下面找到我的代码:
事先感谢您的帮助!
testjobconfig.java
@Configuration
@Profile("myJob-config")
public class TestJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyItemWriter myItemWriter;
@Autowired
private RestartJobTasklet restartJobTasklet;
@Bean("myJob-config")
public Job job(@Qualifier("validateStep") Step validateStep,
@Qualifier("processRecords") Step processRecords) {
Job job = jobBuilderFactory.get("myJob-config")
.incrementer(new RunIdIncrementer())
.start(validateStep)
.on("FAILED")
.end()
.from(validateStep).on("*").to(processRecords)
.end()
.build();
return job;
}
@Bean("restart-myjob")
public Job restartJob(@Qualifier("restartMyJobStep") Step restartMyJobStep) {
return jobBuilderFactory.get("restart-myjob")
.incrementer(new RunIdIncrementer())
.start(restartMyJobStep)
.build();
}
@Bean(name = "restartMyJobStep")
public Step restartMyJobStep() {
return this.stepBuilderFactory.get("restart-failed-job")
.tasklet(restartJobTasklet)
.build();
}
@Bean(name = "processRecords")
public Step processRecords() {
return this.stepBuilderFactory.get("process-csv-records").<Employee, Employee>chunk(5)
.reader(reader())
.writer(itemWriter())
.build();
}
@Bean(name = "validateStep")
public Step validateStep(@Qualifier("validateTasklet") Tasklet validateTasklet) {
return stepBuilderFactory.get("validateStep")
.tasklet(validateTasklet)
.allowStartIfComplete(true)
.build();
}
@Bean(name = "validateTasklet")
public Tasklet validateTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
};
}
@Bean
public FlatFileItemReader<Employee> reader() {
FlatFileItemReader<Employee> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setLinesToSkip(1);
flatFileItemReader.setResource(new ClassPathResource("/csv/emps.csv"));
DefaultLineMapper<Employee> empDefaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames(new String[]{"id", "firstName", "lastName"});
empDefaultLineMapper.setLineTokenizer(lineTokenizer);
empDefaultLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
empDefaultLineMapper.afterPropertiesSet();
flatFileItemReader.setLineMapper(empDefaultLineMapper);
return flatFileItemReader;
}
@Bean
public MyItemWriter<Employee> itemWriter() {
return myItemWriter;
}
}
restartjobtasklet.java
@Component
public class RestartJobTasklet implements Tasklet, StepExecutionListener {
@Autowired
JobExplorer jobExplorer;
@Autowired
JobOperator jobOperator;
private StepExecution stepExecution;
private JobExecution jobExecution;
@Autowired
private OpsJobProperties props;
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.jobExecution = stepExecution.getJobExecution();
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Long executionId = 8l;
try {
Long restartId = jobOperator.restart(executionId);
JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
} catch (JobRestartException e) {
throw e;
} catch (Exception exception) {
throw exception;
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
dbconfig.java
@Configuration
public class DBConfig extends DefaultBatchConfigurer {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Bean
public JobRepository jobRepository(@Autowired DataSource dataSource,
@Autowired PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactory = new JobRepositoryFactoryBean();
jobRepositoryFactory.setDatabaseType(DatabaseType.POSTGRES.name());
jobRepositoryFactory.setDataSource(dataSource);
jobRepositoryFactory.setTransactionManager(transactionManager);
jobRepositoryFactory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
jobRepositoryFactory.setTablePrefix("BATCH_");
jobRepositoryFactory.setMaxVarCharLength(1000);
jobRepositoryFactory.setValidateTransactionState(Boolean.FALSE);
return jobRepositoryFactory.getObject();
}
@Bean()
public DataSource dataSource() {
PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setServerName("my-db-server");
pgSimpleDataSource.setDatabaseName("test-db");
pgSimpleDataSource.setUser("test");
pgSimpleDataSource.setPassword("test");
return pgSimpleDataSource;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(final JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Bean
public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
final SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobExplorer(jobExplorer);
return jobOperator;
}
@Bean
public JobExplorer jobExplorer(@Autowired DataSource dataSource) throws Exception {
final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
bean.setDataSource(dataSource);
bean.setTablePrefix("BATCH_");
bean.setJdbcOperations(new JdbcTemplate(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
@Bean
public PlatformTransactionManager transactionManager(@Autowired DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
错误日志
2022-05-03 18:58:44,865 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [org.springframework.batch.core.step.AbstractStep]
Encountered an error executing step restart-failed-job in job restart-myjob
java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:177)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy68.createJobExecution(Unknown Source)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy73.run(Unknown Source)
at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283)
at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$e5e87de1.restart(<generated>)
I'm testing my spring batch job with restart
feature, I'm processing total of nine records with chunk size 5,
after processing the first chunk - I'm intentionally failing the second chunk to test the failure scenario. As expected, my batch got failed after the first chunk processed successfully and in my table - batch_job_execution
I've the record with execution id and status as FAILED
. Now I'm running the restart job by passing the execution id to verify the failed records are processing or not. But I'm getting the below exception when I run the failed job:
2022-05-03 18:58:44,829 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [RestartJobTasklet]
Exception java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).while restart the failed job executionId8
Could you please assist me here - what I'm missing here. Please find my code below:
Appreciated your help in advance!
TestJobConfig.java
@Configuration
@Profile("myJob-config")
public class TestJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyItemWriter myItemWriter;
@Autowired
private RestartJobTasklet restartJobTasklet;
@Bean("myJob-config")
public Job job(@Qualifier("validateStep") Step validateStep,
@Qualifier("processRecords") Step processRecords) {
Job job = jobBuilderFactory.get("myJob-config")
.incrementer(new RunIdIncrementer())
.start(validateStep)
.on("FAILED")
.end()
.from(validateStep).on("*").to(processRecords)
.end()
.build();
return job;
}
@Bean("restart-myjob")
public Job restartJob(@Qualifier("restartMyJobStep") Step restartMyJobStep) {
return jobBuilderFactory.get("restart-myjob")
.incrementer(new RunIdIncrementer())
.start(restartMyJobStep)
.build();
}
@Bean(name = "restartMyJobStep")
public Step restartMyJobStep() {
return this.stepBuilderFactory.get("restart-failed-job")
.tasklet(restartJobTasklet)
.build();
}
@Bean(name = "processRecords")
public Step processRecords() {
return this.stepBuilderFactory.get("process-csv-records").<Employee, Employee>chunk(5)
.reader(reader())
.writer(itemWriter())
.build();
}
@Bean(name = "validateStep")
public Step validateStep(@Qualifier("validateTasklet") Tasklet validateTasklet) {
return stepBuilderFactory.get("validateStep")
.tasklet(validateTasklet)
.allowStartIfComplete(true)
.build();
}
@Bean(name = "validateTasklet")
public Tasklet validateTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
};
}
@Bean
public FlatFileItemReader<Employee> reader() {
FlatFileItemReader<Employee> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setLinesToSkip(1);
flatFileItemReader.setResource(new ClassPathResource("/csv/emps.csv"));
DefaultLineMapper<Employee> empDefaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames(new String[]{"id", "firstName", "lastName"});
empDefaultLineMapper.setLineTokenizer(lineTokenizer);
empDefaultLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
empDefaultLineMapper.afterPropertiesSet();
flatFileItemReader.setLineMapper(empDefaultLineMapper);
return flatFileItemReader;
}
@Bean
public MyItemWriter<Employee> itemWriter() {
return myItemWriter;
}
}
RestartJobTasklet.java
@Component
public class RestartJobTasklet implements Tasklet, StepExecutionListener {
@Autowired
JobExplorer jobExplorer;
@Autowired
JobOperator jobOperator;
private StepExecution stepExecution;
private JobExecution jobExecution;
@Autowired
private OpsJobProperties props;
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.jobExecution = stepExecution.getJobExecution();
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Long executionId = 8l;
try {
Long restartId = jobOperator.restart(executionId);
JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
} catch (JobRestartException e) {
throw e;
} catch (Exception exception) {
throw exception;
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
DBConfig.java
@Configuration
public class DBConfig extends DefaultBatchConfigurer {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Bean
public JobRepository jobRepository(@Autowired DataSource dataSource,
@Autowired PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactory = new JobRepositoryFactoryBean();
jobRepositoryFactory.setDatabaseType(DatabaseType.POSTGRES.name());
jobRepositoryFactory.setDataSource(dataSource);
jobRepositoryFactory.setTransactionManager(transactionManager);
jobRepositoryFactory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
jobRepositoryFactory.setTablePrefix("BATCH_");
jobRepositoryFactory.setMaxVarCharLength(1000);
jobRepositoryFactory.setValidateTransactionState(Boolean.FALSE);
return jobRepositoryFactory.getObject();
}
@Bean()
public DataSource dataSource() {
PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setServerName("my-db-server");
pgSimpleDataSource.setDatabaseName("test-db");
pgSimpleDataSource.setUser("test");
pgSimpleDataSource.setPassword("test");
return pgSimpleDataSource;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(final JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Bean
public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
final SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobExplorer(jobExplorer);
return jobOperator;
}
@Bean
public JobExplorer jobExplorer(@Autowired DataSource dataSource) throws Exception {
final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
bean.setDataSource(dataSource);
bean.setTablePrefix("BATCH_");
bean.setJdbcOperations(new JdbcTemplate(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
@Bean
public PlatformTransactionManager transactionManager(@Autowired DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
Error Log
2022-05-03 18:58:44,865 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [org.springframework.batch.core.step.AbstractStep]
Encountered an error executing step restart-failed-job in job restart-myjob
java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:177)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy68.createJobExecution(Unknown Source)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy73.run(Unknown Source)
at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283)
at org.springframework.batch.core.launch.support.SimpleJobOperator$FastClassBySpringCGLIB$44ee6049.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.batch.core.launch.support.SimpleJobOperator$EnhancerBySpringCGLIB$e5e87de1.restart(<generated>)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您正在尝试从另一个作业的任务中重新启动工作。此任务(
restartJobtasklet
)已经在交易中运行,并调用一种方法,该方法导致另一个使用jobrepository#createJobexecution创建的事务
下行。因此错误。换句话说,您不应在交易上下文中使用jobrepository
。我不确定创建一个重新启动另一个工作的工作是否是个好主意。重新启动失败的作业实例通常是一项技术任务,不需要工作。我建议设计工作以实施业务逻辑,但不要用于技术任务。
就是说,如果您在
main
方法中提取重新启动代码,则您的样本应按预期工作。You are trying to restart a job from within a tasklet in another job. This tasklet (
RestartJobTasklet
) is already running in a transaction, and calling a method that results in another transaction being created withJobRepository#createJobExecution
down the line. Hence the error. In other words, you should not use theJobRepository
in a transactional context.I am not sure if it's a good idea to create a job to restart another job. Restarting a failed job instance is typically a technical task that does not require a job. I would recommend designing jobs to implement business logic, but not for technical tasks.
That said, if you extract the restart code in a
main
method, your sample should work as expected.