我们可以在 Spring Batch Processor 中一起处理整个块吗

发布于 01-09 02:20 字数 1348 浏览 2 评论 0原文

我有一个场景,员工暂存表中有数百万条记录,我需要丰富该值并将其存储在员工最终表中。现在我正在使用块大小为 10,000 的块处理。

在我必须进行丰富的处理器中,我想收集该块的所有员工 ID,并对每个块执行 1 个 API 调用并丰富值。而不是百万次调用百万条记录。

我观察到我现在使用的扩展 RepositoryItemReader 的阅读器没有返回列表,因为我正在使用 JPA 我正在使用 RepositoryItemReader。 因此,即使块大小为 10,000,每个项目也会进行 1 次处理。

我们可以从读者那里获取整个列表并对其进行处理吗? 或者还有其他方法吗,因为我无法真正为每条记录拨打 1 次电话。

public class EmployeeStagingReader extends RepositoryItemReader<EmployeeStaging>{
     public EmployeeStagingReader(EmployeeStagingRepository repo){
       super();
       this.setRepository(repo);
       this.setMethodName("findAll");
       final Map<String,Sort.Direction> sorts = new HashMap<>();
       sorts.put("ID",Sort.Direction.ASC)
       this.setSort(sorts);
  }
}


public class EmployeeProcessor implements ItemProcessor<List<EmployeeStaging>, List<EmployeeFinal>> {
  
        //Want to Perform transformation  of stagingemployee list of records and return employeefinal list of records

    }
}


@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
                .<List<EmployeeStaging>, List<EmployeeFinal>>chunk(1000)
                .reader(EmployeeStagingReader())
                .processor(EmployeeProcessor())
                .writer(EmployeeFinalWriter())
                .build();
}

I have a scenario where I have million records in employee staging table, I need to enrich the value and store it in employee final table. Now I am using chunk processing with chunk size of 10,000.

In processor where I have to do enriching I want to collect all employee-id for that chunk and do 1 API call per chunk and enrich value. Instead of million call for million records.

I am observing that reader I am using right now which is extending RepositoryItemReader is not returning List as I am using JPA I am using RepositoryItemReader.
So processing is happening 1 time per item even after chunk size as 10,000.

Can we get the whole List from reader and do processing on it?
Or is there any other approach, cause I cant really make 1 call per record.

public class EmployeeStagingReader extends RepositoryItemReader<EmployeeStaging>{
     public EmployeeStagingReader(EmployeeStagingRepository repo){
       super();
       this.setRepository(repo);
       this.setMethodName("findAll");
       final Map<String,Sort.Direction> sorts = new HashMap<>();
       sorts.put("ID",Sort.Direction.ASC)
       this.setSort(sorts);
  }
}


public class EmployeeProcessor implements ItemProcessor<List<EmployeeStaging>, List<EmployeeFinal>> {
  
        //Want to Perform transformation  of stagingemployee list of records and return employeefinal list of records

    }
}


@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
                .<List<EmployeeStaging>, List<EmployeeFinal>>chunk(1000)
                .reader(EmployeeStagingReader())
                .processor(EmployeeProcessor())
                .writer(EmployeeFinalWriter())
                .build();
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不即不离2025-01-16 02:20:42

尝试这种方法。

  1. 在基于块的数据持久步骤之前创建一个 Tasklet 步骤。
  2. 因此,在此 Tasklet 步骤中,对员工 ID 进行分组并进行 API 调用并更新临时表本身中的员工 ID。因此,您可以设法减少 API 调用。
  3. 在最后一步中,只需使用基于卡盘的方法读取数据并写入。也不再需要处理器。

Try this approach.

  1. Create a Tasklet step before data persisting step which is chunk based one.
  2. So in this Tasklet step, group employee-ids and make API calls and update employee-ids in staging table itself. So, you can manage to reduce API calls.
  3. In last step, just read data and write using chuck based approach. No more required processor too.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文