春季批处理重新启动工作失败,交易异常

发布于 2025-01-25 22:09:08 字数 10572 浏览 5 评论 0原文

我正在使用重新启动功能测试我的春季批处理作业,我正在处理块尺寸5,的九个记录,在处理第一个块 - 我故意失败了测试故障情况的第二个大块。不出所料,在成功处理的第一个块处理后,我的批处理失败了 - batch_job_execution我的记录具有执行ID和状态为 。现在,我正在通过传递执行ID来运行重新启动作业,以验证失败的记录是否正在处理。但是,当我运行失败的工作时,我会得到以下例外:

2022-05-03 18:58:44,829 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [RestartJobTasklet] 
Exception java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).while restart the failed job executionId8

您能在这里为我提供帮助 - 我在这里缺少什么。请在下面找到我的代码:

事先感谢您的帮助!

testjobconfig.java

@Configuration
@Profile("myJob-config")
public class TestJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private MyItemWriter myItemWriter;

    @Autowired
    private RestartJobTasklet restartJobTasklet;


    @Bean("myJob-config")
    public Job job(@Qualifier("validateStep") Step validateStep,
                   @Qualifier("processRecords") Step processRecords) {
        Job job = jobBuilderFactory.get("myJob-config")
                .incrementer(new RunIdIncrementer())
                .start(validateStep)
                .on("FAILED")
                .end()
                .from(validateStep).on("*").to(processRecords)
                .end()
                .build();
        return job;
    }

    @Bean("restart-myjob")
    public Job restartJob(@Qualifier("restartMyJobStep") Step restartMyJobStep) {
        return jobBuilderFactory.get("restart-myjob")
                .incrementer(new RunIdIncrementer())
                .start(restartMyJobStep)
                .build();
    }

    @Bean(name = "restartMyJobStep")
    public Step restartMyJobStep() {
        return this.stepBuilderFactory.get("restart-failed-job")
                .tasklet(restartJobTasklet)
                .build();
    }


    @Bean(name = "processRecords")
    public Step processRecords() {
        return this.stepBuilderFactory.get("process-csv-records").<Employee, Employee>chunk(5)
                .reader(reader())
                .writer(itemWriter())
                .build();
    }


    @Bean(name = "validateStep")
    public Step validateStep(@Qualifier("validateTasklet") Tasklet validateTasklet) {
        return stepBuilderFactory.get("validateStep")
                .tasklet(validateTasklet)
                .allowStartIfComplete(true)
                .build();
    }

    @Bean(name = "validateTasklet")
    public Tasklet validateTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public FlatFileItemReader<Employee> reader() {
        FlatFileItemReader<Employee> flatFileItemReader = new FlatFileItemReader<>();

        flatFileItemReader.setLinesToSkip(1);
        flatFileItemReader.setResource(new ClassPathResource("/csv/emps.csv"));

        DefaultLineMapper<Employee> empDefaultLineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setNames(new String[]{"id", "firstName", "lastName"});

        empDefaultLineMapper.setLineTokenizer(lineTokenizer);
        empDefaultLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
        empDefaultLineMapper.afterPropertiesSet();

        flatFileItemReader.setLineMapper(empDefaultLineMapper);

        return flatFileItemReader;
    }

    @Bean
    public MyItemWriter<Employee> itemWriter() {
        return myItemWriter;
    }
}

restartjobtasklet.java

@Component
public class RestartJobTasklet implements Tasklet, StepExecutionListener {

    @Autowired
    JobExplorer jobExplorer;
    @Autowired
    JobOperator jobOperator;
    private StepExecution stepExecution;
    private JobExecution jobExecution;
    @Autowired
    private OpsJobProperties props;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
        this.jobExecution = stepExecution.getJobExecution();
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution,
                                ChunkContext chunkContext) throws Exception {

        Long executionId = 8l;
        try {
            Long restartId = jobOperator.restart(executionId);
            JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
        } catch (JobRestartException e) {
            throw e;
        } catch (Exception exception) {
            throw exception;
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return ExitStatus.COMPLETED;
    }
}

dbconfig.java

@Configuration
public class DBConfig extends DefaultBatchConfigurer {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Bean
    public JobRepository jobRepository(@Autowired DataSource dataSource,
                                       @Autowired PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactory = new JobRepositoryFactoryBean();
        jobRepositoryFactory.setDatabaseType(DatabaseType.POSTGRES.name());
        jobRepositoryFactory.setDataSource(dataSource);
        jobRepositoryFactory.setTransactionManager(transactionManager);
        jobRepositoryFactory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        jobRepositoryFactory.setTablePrefix("BATCH_");
        jobRepositoryFactory.setMaxVarCharLength(1000);
        jobRepositoryFactory.setValidateTransactionState(Boolean.FALSE);
        return jobRepositoryFactory.getObject();
    }

    @Bean()
    public DataSource dataSource() {
        PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
        pgSimpleDataSource.setServerName("my-db-server");
        pgSimpleDataSource.setDatabaseName("test-db");
        pgSimpleDataSource.setUser("test");
        pgSimpleDataSource.setPassword("test");
        return pgSimpleDataSource;
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(final JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    }

    @Bean
    public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
                                   final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
        final SimpleJobOperator jobOperator = new SimpleJobOperator();
        jobOperator.setJobLauncher(jobLauncher);
        jobOperator.setJobRepository(jobRepository);
        jobOperator.setJobRegistry(jobRegistry);
        jobOperator.setJobExplorer(jobExplorer);
        return jobOperator;
    }

    @Bean
    public JobExplorer jobExplorer(@Autowired DataSource dataSource) throws Exception {
        final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
        bean.setDataSource(dataSource);
        bean.setTablePrefix("BATCH_");
        bean.setJdbcOperations(new JdbcTemplate(dataSource));
        bean.afterPropertiesSet();
        return bean.getObject();
    }

    @Bean
    public PlatformTransactionManager transactionManager(@Autowired DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}

错误日志

2022-05-03 18:58:44,865 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [org.springframework.batch.core.step.AbstractStep] 
Encountered an error executing step restart-failed-job in job restart-myjob
java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
        at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:177)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
        at com.sun.proxy.$Proxy68.createJobExecution(Unknown Source)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
        at com.sun.proxy.$Proxy73.run(Unknown Source)
        at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283)
        at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$e5e87de1.restart(<generated>)

I'm testing my spring batch job with restart feature, I'm processing total of nine records with chunk size 5, after processing the first chunk - I'm intentionally failing the second chunk to test the failure scenario. As expected, my batch got failed after the first chunk processed successfully and in my table - batch_job_execution I've the record with execution id and status as FAILED. Now I'm running the restart job by passing the execution id to verify the failed records are processing or not. But I'm getting the below exception when I run the failed job:

2022-05-03 18:58:44,829 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [RestartJobTasklet] 
Exception java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).while restart the failed job executionId8

Could you please assist me here - what I'm missing here. Please find my code below:

Appreciated your help in advance!

TestJobConfig.java

@Configuration
@Profile("myJob-config")
public class TestJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private MyItemWriter myItemWriter;

    @Autowired
    private RestartJobTasklet restartJobTasklet;


    @Bean("myJob-config")
    public Job job(@Qualifier("validateStep") Step validateStep,
                   @Qualifier("processRecords") Step processRecords) {
        Job job = jobBuilderFactory.get("myJob-config")
                .incrementer(new RunIdIncrementer())
                .start(validateStep)
                .on("FAILED")
                .end()
                .from(validateStep).on("*").to(processRecords)
                .end()
                .build();
        return job;
    }

    @Bean("restart-myjob")
    public Job restartJob(@Qualifier("restartMyJobStep") Step restartMyJobStep) {
        return jobBuilderFactory.get("restart-myjob")
                .incrementer(new RunIdIncrementer())
                .start(restartMyJobStep)
                .build();
    }

    @Bean(name = "restartMyJobStep")
    public Step restartMyJobStep() {
        return this.stepBuilderFactory.get("restart-failed-job")
                .tasklet(restartJobTasklet)
                .build();
    }


    @Bean(name = "processRecords")
    public Step processRecords() {
        return this.stepBuilderFactory.get("process-csv-records").<Employee, Employee>chunk(5)
                .reader(reader())
                .writer(itemWriter())
                .build();
    }


    @Bean(name = "validateStep")
    public Step validateStep(@Qualifier("validateTasklet") Tasklet validateTasklet) {
        return stepBuilderFactory.get("validateStep")
                .tasklet(validateTasklet)
                .allowStartIfComplete(true)
                .build();
    }

    @Bean(name = "validateTasklet")
    public Tasklet validateTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public FlatFileItemReader<Employee> reader() {
        FlatFileItemReader<Employee> flatFileItemReader = new FlatFileItemReader<>();

        flatFileItemReader.setLinesToSkip(1);
        flatFileItemReader.setResource(new ClassPathResource("/csv/emps.csv"));

        DefaultLineMapper<Employee> empDefaultLineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setNames(new String[]{"id", "firstName", "lastName"});

        empDefaultLineMapper.setLineTokenizer(lineTokenizer);
        empDefaultLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
        empDefaultLineMapper.afterPropertiesSet();

        flatFileItemReader.setLineMapper(empDefaultLineMapper);

        return flatFileItemReader;
    }

    @Bean
    public MyItemWriter<Employee> itemWriter() {
        return myItemWriter;
    }
}

RestartJobTasklet.java

@Component
public class RestartJobTasklet implements Tasklet, StepExecutionListener {

    @Autowired
    JobExplorer jobExplorer;
    @Autowired
    JobOperator jobOperator;
    private StepExecution stepExecution;
    private JobExecution jobExecution;
    @Autowired
    private OpsJobProperties props;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
        this.jobExecution = stepExecution.getJobExecution();
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution,
                                ChunkContext chunkContext) throws Exception {

        Long executionId = 8l;
        try {
            Long restartId = jobOperator.restart(executionId);
            JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
        } catch (JobRestartException e) {
            throw e;
        } catch (Exception exception) {
            throw exception;
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return ExitStatus.COMPLETED;
    }
}

DBConfig.java

@Configuration
public class DBConfig extends DefaultBatchConfigurer {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Bean
    public JobRepository jobRepository(@Autowired DataSource dataSource,
                                       @Autowired PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactory = new JobRepositoryFactoryBean();
        jobRepositoryFactory.setDatabaseType(DatabaseType.POSTGRES.name());
        jobRepositoryFactory.setDataSource(dataSource);
        jobRepositoryFactory.setTransactionManager(transactionManager);
        jobRepositoryFactory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        jobRepositoryFactory.setTablePrefix("BATCH_");
        jobRepositoryFactory.setMaxVarCharLength(1000);
        jobRepositoryFactory.setValidateTransactionState(Boolean.FALSE);
        return jobRepositoryFactory.getObject();
    }

    @Bean()
    public DataSource dataSource() {
        PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
        pgSimpleDataSource.setServerName("my-db-server");
        pgSimpleDataSource.setDatabaseName("test-db");
        pgSimpleDataSource.setUser("test");
        pgSimpleDataSource.setPassword("test");
        return pgSimpleDataSource;
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(final JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    }

    @Bean
    public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
                                   final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
        final SimpleJobOperator jobOperator = new SimpleJobOperator();
        jobOperator.setJobLauncher(jobLauncher);
        jobOperator.setJobRepository(jobRepository);
        jobOperator.setJobRegistry(jobRegistry);
        jobOperator.setJobExplorer(jobExplorer);
        return jobOperator;
    }

    @Bean
    public JobExplorer jobExplorer(@Autowired DataSource dataSource) throws Exception {
        final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
        bean.setDataSource(dataSource);
        bean.setTablePrefix("BATCH_");
        bean.setJdbcOperations(new JdbcTemplate(dataSource));
        bean.afterPropertiesSet();
        return bean.getObject();
    }

    @Bean
    public PlatformTransactionManager transactionManager(@Autowired DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}

Error Log

2022-05-03 18:58:44,865 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [org.springframework.batch.core.step.AbstractStep] 
Encountered an error executing step restart-failed-job in job restart-myjob
java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
        at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:177)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
        at com.sun.proxy.$Proxy68.createJobExecution(Unknown Source)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
        at com.sun.proxy.$Proxy73.run(Unknown Source)
        at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283)
        at org.springframework.batch.core.launch.support.SimpleJobOperator$FastClassBySpringCGLIB$44ee6049.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.batch.core.launch.support.SimpleJobOperator$EnhancerBySpringCGLIB$e5e87de1.restart(<generated>)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

时光清浅 2025-02-01 22:09:08

您正在尝试从另一个作业的任务中重新启动工作。此任务(restartJobtasklet)已经在交易中运行,并调用一种方法,该方法导致另一个使用jobrepository#createJobexecution创建的事务下行。因此错误。换句话说,您不应在交易上下文中使用jobrepository

我不确定创建一个重新启动另一个工作的工作是否是个好主意。重新启动失败的作业实例通常是一项技术任务,不需要工作。我建议设计工作以实施业务逻辑,但不要用于技术任务。

就是说,如果您在main方法中提取重新启动代码,则您的样本应按预期工作。

You are trying to restart a job from within a tasklet in another job. This tasklet (RestartJobTasklet) is already running in a transaction, and calling a method that results in another transaction being created with JobRepository#createJobExecution down the line. Hence the error. In other words, you should not use the JobRepository in a transactional context.

I am not sure if it's a good idea to create a job to restart another job. Restarting a failed job instance is typically a technical task that does not require a job. I would recommend designing jobs to implement business logic, but not for technical tasks.

That said, if you extract the restart code in a main method, your sample should work as expected.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文