如何正确地将syncitemwriter添加到一个步骤中?

发布于 2025-02-04 09:42:26 字数 1237 浏览 2 评论 0原文

在春季批处理作业中,如下所示,我尝试使用asyncwriter,

@Bean
public Step readWriteStep() throws Exception {
    return stepBuilderFactory.get("readWriteStep")
        .listener(listener)
        .<Data, Data>chunk(10)
        .reader(dataItemReader())
        .writer(dataAsyncWriter())
        .build();
}

@Bean
public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
    AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(dataItemWriter);
    asyncItemWriter.afterPropertiesSet();
    return asyncItemWriter;
}

如果我尝试这样的Intellij抱怨:

Required type: ItemWriter <? super Data>
Provided: AsyncItemWriter <Data>

当我更改。data,data,data&gt; chunk(10) to 时。 &lt; data,future&lt; data&gt;&gt;块(10) Intellij不会发出任何警告,但是当我运行作业时,我会得到以下例外:

java.lang.ClassCastException: Data cannot be cast to class java.util.concurrent.Future Data is in unnamed module of loader 'app'; 
java.util.concurrent.Future is in module java.base of loader 'bootstrap'

对于第一个和第二个参数是什么? 。&lt; data,data&gt;块(10)

这两个参数是处理器所需的两个参数,第二个参数是处理器回馈什么?

如何解决这个问题?

In a Spring Batch Job like the following I'm trying to use an AsyncWriter

@Bean
public Step readWriteStep() throws Exception {
    return stepBuilderFactory.get("readWriteStep")
        .listener(listener)
        .<Data, Data>chunk(10)
        .reader(dataItemReader())
        .writer(dataAsyncWriter())
        .build();
}

@Bean
public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
    AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(dataItemWriter);
    asyncItemWriter.afterPropertiesSet();
    return asyncItemWriter;
}

If I try like this intelliJ complains:

Required type: ItemWriter <? super Data>
Provided: AsyncItemWriter <Data>

When I change .<Data, Data>chunk(10) to .<Data, Future<Data>>chunk(10) intelliJ does not make any warning, but when I run the Job, I get the following Exception:

java.lang.ClassCastException: Data cannot be cast to class java.util.concurrent.Future Data is in unnamed module of loader 'app'; 
java.util.concurrent.Future is in module java.base of loader 'bootstrap'

For what is the first and the second parameter here? .<Data, Data>chunk(10)?

Are these two parameters for what the processor takes and the second what the processor is giving back?

How do I solve this Problem?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

生活了然无味 2025-02-11 09:42:26

如果您更改步骤定义以使用以下内容,则应编译您的示例:

.<Data, Future<Data>>chunk(10)

也就是说,我不确定在运行时可以正常工作,因为asyncitemWriter有望从其封闭式中解开构造项目未来,其中这些未来 s由 asyncitempropersorsor

换句话说,asyncitemWriterasyncitemProcessor应结合使用此模式来使用。这是两者的快速示例:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class SO72477556 {

    @Bean
    public ItemReader<Data> dataItemReader() {
        return new ListItemReader<Data>(Arrays.asList());
    }


    @Bean
    public ItemProcessor<Data, Data> dataItemProcessor() {
        return new ItemProcessor<Data, Data>() {
            @Override
            public Data process(Data item) throws Exception {
                return item;
            }
        };
    }

    @Bean
    public AsyncItemProcessor<Data, Data> asyncDataItemProcessor() {
        AsyncItemProcessor<Data, Data> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(dataItemProcessor());
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemWriter<Data> dataItemWriter() {
        return new ItemWriter<Data>() {
            @Override
            public void write(List<? extends Data> items) throws Exception {

            }
        };
    }

    @Bean
    public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
        AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(dataItemWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean
    public Step readWriteStep(StepBuilderFactory stepBuilderFactory) throws Exception {
        return stepBuilderFactory.get("readWriteStep")
                .<Data, Future<Data>>chunk(10)
                .reader(dataItemReader())
                .processor(asyncDataItemProcessor())
                .writer(dataAsyncWriter())
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
        return jobs.get("job")
                .start(readWriteStep(steps))
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(SO72477556.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    static class Data {}

}

Your example should compile if you change the step definition to use the following:

.<Data, Future<Data>>chunk(10)

That said, I'm not sure this will work correctly at runtime because the AsyncItemWriter is expected to unwrap items from their enclosing Futures, where these Futures are created by an AsyncItemProcessor.

In other words, AsyncItemWriter and AsyncItemProcessor should be used in conjunction for this pattern to work. Here is a quick example with both of them:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class SO72477556 {

    @Bean
    public ItemReader<Data> dataItemReader() {
        return new ListItemReader<Data>(Arrays.asList());
    }


    @Bean
    public ItemProcessor<Data, Data> dataItemProcessor() {
        return new ItemProcessor<Data, Data>() {
            @Override
            public Data process(Data item) throws Exception {
                return item;
            }
        };
    }

    @Bean
    public AsyncItemProcessor<Data, Data> asyncDataItemProcessor() {
        AsyncItemProcessor<Data, Data> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(dataItemProcessor());
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemWriter<Data> dataItemWriter() {
        return new ItemWriter<Data>() {
            @Override
            public void write(List<? extends Data> items) throws Exception {

            }
        };
    }

    @Bean
    public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
        AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(dataItemWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean
    public Step readWriteStep(StepBuilderFactory stepBuilderFactory) throws Exception {
        return stepBuilderFactory.get("readWriteStep")
                .<Data, Future<Data>>chunk(10)
                .reader(dataItemReader())
                .processor(asyncDataItemProcessor())
                .writer(dataAsyncWriter())
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
        return jobs.get("job")
                .start(readWriteStep(steps))
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(SO72477556.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    static class Data {}

}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文