春季批处理远程分区与kafka-师父始终继续在乔布里斯坦纳(Joblistener)中最旧的工作
我正在使用 Spring Batch
以及 Spring-boot 2.5.6
。我决定将远程分区与Kafka用作中间件。我有一名经理和三名工人。因此,已经为经理的输入主题分配了一个分区,并为工人的输入分配了三个分区。
管理器获取一个文件,创建倍数 executionContext
s并通过kafka发送。工人开始处理各个步骤,并在其过程结束时发送消息。经理将汇总工人的结果,并决定完成所有工人的工作。到目前为止,一切都很好。
现在假设我首先经营一项长期运行的工作,需要大量时间才能完成,然后我经营一份很快完成的小工作。毫不奇怪,第二个工作更早完成并发送完整的信号,经理会消耗此消息并继续该过程。我什至检查了汇总的MessageHandler
,完成的消息仅与第二个作业有关(短路运行),我检查了 jobexecutionId
现在问题发生了,我有一个 jublistener
具有 AfterJob
方法。此方法将与第一个作业(仍在处理工人处理的长期运行),而不是第二个工作(这是第二个工作(已发送完整信号的短期运行)!我可以通过查看 JobExecutionID
来这样说。这真的很奇怪,因为我从未在日志中看到第一份工作的完成信号。
一段时间后,每当第一个长期运行的工作完成后,最终的工人都会发送一条完整的消息,并且经理决定完成工作,现在 jublistener
与第二个作业(短期运行)一)!
我不明白怎么了?我想假设这可能是一个错过的配置,但是通过调试代码并检查 grokingMessageHandler
并在工人和经理中进行跟踪日志,我可以清楚地看到消息已发送,并且有良好的发送。消息没有错。欢迎任何建议/想法。
更新
这是一个示例实现:假设我们有一个客户表。
作业采用最终
和 maxID
(客户表中的ID列是一个简单的数字),然后管理器根据IDS范围创建多个ExecutionContexts。
Manager config
package com.example.batchdemo.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;
@Profile("!worker")
@Configuration
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final JobExplorer jobExplorer;
private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
private final JobListener jobListener;
public JobConfiguration(JobBuilderFactory jobBuilderFactory, JobExplorer jobExplorer, RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory, JobListener jobListener) {
this.jobBuilderFactory = jobBuilderFactory;
this.jobExplorer = jobExplorer;
this.managerStepBuilderFactory = managerStepBuilderFactory;
this.jobListener = jobListener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(managerStep())
.listener(jobListener)
.build();
}
@Bean
public Step managerStep() {
return managerStepBuilderFactory.get("managerStep")
.partitioner("workerStep", rangePartitioner(null, null))
.outputChannel(requestForWorkers())
.inputChannel(repliesFromWorkers())
.jobExplorer(jobExplorer)
.build();
}
@Bean
@StepScope
public Partitioner rangePartitioner(@Value("#{jobParameters['minId']}") Integer minId, @Value("#{jobParameters['maxId']}") Integer maxId) {
return new CustomerIdRangePartitioner(minId, maxId);
}
////////////////////////////////////////////////////////////////////////////////////////////////
@Bean
public DirectChannel requestForWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
return IntegrationFlows
.from(requestForWorkers())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("requestForWorkers"))
.route("requestForWorkers")
.get();
}
@Bean
public DirectChannel repliesFromWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
return IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("repliesFromWorkers")))
.channel(repliesFromWorkers())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
}
Worker配置
package com.example.batchdemo.job;
import com.example.batchdemo.domain.Customer;
import com.example.batchdemo.domain.CustomerRowMapper;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@Profile("worker")
public class WorkerConfiguration {
private static final int CHUNK_SIZE = 10;
private static final int WAITING_TIME = 3000;
public final DataSource dataSource;
private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
public WorkerConfiguration(DataSource dataSource, RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
this.dataSource = dataSource;
this.workerStepBuilderFactory = workerStepBuilderFactory;
}
@Bean
public DirectChannel repliesFromWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
return IntegrationFlows
.from(repliesFromWorkers())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("repliesFromWorkers"))
.route("repliesFromWorkers")
.get();
}
@Bean
public DirectChannel requestForWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
return IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
.channel(requestForWorkers())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
/////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////
@Bean
public Step workerStep() {
SimpleStepBuilder workerStepBuilder = workerStepBuilderFactory.get("workerStep")
.inputChannel(requestForWorkers())
.outputChannel(repliesFromWorkers())
.<Customer, Customer>chunk(CHUNK_SIZE)
.reader(pagingItemReader(null, null))
.processor(itemProcessor())
.writer(customerItemWriter());
return workerStepBuilder.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from CUSTOMER");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public ItemProcessor<Customer, Customer> itemProcessor() {
return item -> {
Thread.sleep(WAITING_TIME);
System.out.println(item);
return item;
};
}
@Bean
@StepScope
public ItemWriter<Customer> customerItemWriter() {
return items -> {
System.out.printf("%d items were written%n", items.size());
};
}
}
分区器:
package com.example.batchdemo.job;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.HashMap;
import java.util.Map;
public class CustomerIdRangePartitioner implements Partitioner {
private final int minId;
private final int maxId;
private final int gridSize;
public CustomerIdRangePartitioner(int minId, int maxId, int gridSize) {
this.minId = minId;
this.maxId = maxId;
this.gridSize = gridSize;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int number = (maxId - minId) / this.gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
for (int i = 0; i < number; i++) {
ExecutionContext executionContext = new ExecutionContext();
int start = minId + (this.gridSize * i);
int end = start + (this.gridSize * (i + 1));
executionContext.putInt("minValue", start);
executionContext.putInt("maxValue", Math.min(end, maxId));
result.put("partition" + i, executionContext);
}
return result;
}
}
jublistener
package com.example.batchdemo.job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;
@Component
@JobScope
public class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobId() + " was finished: " + jobExecution.getStatus());
}
}
appconfiguration
package com.example.batchdemo.controller;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AppConfiguration {
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final JobRegistry jobRegistry;
private final ApplicationContext applicationContext;
public AppConfiguration(JobExplorer jobExplorer, JobRepository jobRepository, JobRegistry jobRegistry, ApplicationContext applicationContext) {
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.jobRegistry = jobRegistry;
this.applicationContext = applicationContext;
}
@Bean
public synchronized JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
JobRegistryBeanPostProcessor registrar = new JobRegistryBeanPostProcessor();
registrar.setJobRegistry(jobRegistry);
registrar.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
registrar.afterPropertiesSet();
return registrar;
}
@Bean
public JobOperator jobOperator() throws Exception {
SimpleJobOperator simpleJobOperator = new SimpleJobOperator();
simpleJobOperator.setJobLauncher(getJobLauncher());
simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
simpleJobOperator.setJobRepository(this.jobRepository);
simpleJobOperator.setJobExplorer(this.jobExplorer);
simpleJobOperator.setJobRegistry(this.jobRegistry);
simpleJobOperator.afterPropertiesSet();
return simpleJobOperator;
}
@Bean
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = null;
jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(jobOperatorExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public ThreadPoolTaskExecutor jobOperatorExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(64);
threadPoolTaskExecutor.setMaxPoolSize(256);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>batch-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>batch-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是春季批处理中的错误。确实,侦听器被要求使用错误的
jobexectution
实例完成的作业。制作JobExeCutionListener
作业分割并不能解决问题。我将重新打开问题在Github上 在Github上进行进一步调查。
This is a bug in Spring Batch. The listener is indeed called for the job that finishes earlier with the wrong
JobExecution
instance. Making theJobExecutionListener
job-scoped does not solve the issue.I will re-open the issue on Github for further investigation.