春季批处理远程分区与kafka-师父始终继续在乔布里斯坦纳(Joblistener)中最旧的工作

发布于 2025-02-07 08:09:36 字数 18957 浏览 1 评论 0 原文

我正在使用 Spring Batch 以及 Spring-boot 2.5.6 。我决定将远程分区与Kafka用作中间件。我有一名经理和三名工人。因此,已经为经理的输入主题分配了一个分区,并为工人的输入分配了三个分区。

管理器获取一个文件,创建倍数 executionContext s并通过kafka发送。工人开始处理各个步骤,并在其过程结束时发送消息。经理将汇总工人的结果,并决定完成所有工人的工作。到目前为止,一切都很好。

现在假设我首先经营一项长期运行的工作,需要大量时间才能完成,然后我经营一份很快完成的小工作。毫不奇怪,第二个工作更早完成并发送完整的信号,经理会消耗此消息并继续该过程。我什至检查了汇总的MessageHandler ,完成的消息仅与第二个作业有关(短路运行),我检查了 jobexecutionId

现在问题发生了,我有一个 jublistener 具有 AfterJob 方法。此方法将与第一个作业(仍在处理工人处理的长期运行),而不是第二个工作(这是第二个工作(已发送完整信号的短期运行)!我可以通过查看 JobExecutionID 来这样说。这真的很奇怪,因为我从未在日志中看到第一份工作的完成信号。

一段时间后,每当第一个长期运行的工作完成后,最终的工人都会发送一条完整的消息,并且经理决定完成工作,现在 jublistener 与第二个作业(短期运行)一)!

我不明白怎么了?我想假设这可能是一个错过的配置,但是通过调试代码并检查 grokingMessageHandler 并在工人和经理中进行跟踪日志,我可以清楚地看到消息已发送,并且有良好的发送。消息没有错。欢迎任何建议/想法。

更新

这是一个示例实现:假设我们有一个客户表。 作业采用最终 maxID (客户表中的ID列是一个简单的数字),然后管理器根据IDS范围创建多个ExecutionContexts。

Manager config

package com.example.batchdemo.job;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;

@Profile("!worker")
@Configuration
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final JobExplorer jobExplorer;
    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
    private final JobListener jobListener;

    public JobConfiguration(JobBuilderFactory jobBuilderFactory, JobExplorer jobExplorer, RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory, JobListener jobListener) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.jobExplorer = jobExplorer;
        this.managerStepBuilderFactory = managerStepBuilderFactory;
        this.jobListener = jobListener;
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(managerStep())
                .listener(jobListener)
                .build();
    }

    @Bean
    public Step managerStep() {
        return managerStepBuilderFactory.get("managerStep")
                .partitioner("workerStep", rangePartitioner(null, null))
                .outputChannel(requestForWorkers())
                .inputChannel(repliesFromWorkers())
                .jobExplorer(jobExplorer)
                .build();
    }

    @Bean
    @StepScope
    public Partitioner rangePartitioner(@Value("#{jobParameters['minId']}") Integer minId, @Value("#{jobParameters['maxId']}") Integer maxId) {
        return new CustomerIdRangePartitioner(minId, maxId);
    }


    ////////////////////////////////////////////////////////////////////////////////////////////////

    @Bean
    public DirectChannel requestForWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
        return IntegrationFlows
                .from(requestForWorkers())
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("requestForWorkers"))
                .route("requestForWorkers")
                .get();
    }

    @Bean
    public DirectChannel repliesFromWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("repliesFromWorkers")))
                .channel(repliesFromWorkers())
                .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

}

Worker配置

package com.example.batchdemo.job;

import com.example.batchdemo.domain.Customer;
import com.example.batchdemo.domain.CustomerRowMapper;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@Profile("worker")
public class WorkerConfiguration {

    private static final int CHUNK_SIZE = 10;
    private static final int WAITING_TIME = 3000;

    public final DataSource dataSource;
    private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

    public WorkerConfiguration(DataSource dataSource, RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
        this.dataSource = dataSource;
        this.workerStepBuilderFactory = workerStepBuilderFactory;
    }

    @Bean
    public DirectChannel repliesFromWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
        return IntegrationFlows
                .from(repliesFromWorkers())
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("repliesFromWorkers"))
                .route("repliesFromWorkers")
                .get();
    }

    @Bean
    public DirectChannel requestForWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
                .channel(requestForWorkers())
                .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

    /////////////////////////////////////////////////////////////////
    /////////////////////////////////////////////////////////////////

    @Bean
    public Step workerStep() {
        SimpleStepBuilder workerStepBuilder = workerStepBuilderFactory.get("workerStep")
                .inputChannel(requestForWorkers())
                .outputChannel(repliesFromWorkers())
                .<Customer, Customer>chunk(CHUNK_SIZE)
                .reader(pagingItemReader(null, null))
                .processor(itemProcessor())
                .writer(customerItemWriter());
        return workerStepBuilder.build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
                                                           @Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
        System.out.println("reading " + minValue + " to " + maxValue);
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setFetchSize(1000);
        reader.setRowMapper(new CustomerRowMapper());

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from CUSTOMER");
        queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

    @Bean
    @StepScope
    public ItemProcessor<Customer, Customer> itemProcessor() {
        return item -> {
            Thread.sleep(WAITING_TIME);
            System.out.println(item);
            return item;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Customer> customerItemWriter() {
        return items -> {
            System.out.printf("%d items were written%n", items.size());
        };
    }

}

分区器:

package com.example.batchdemo.job;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

import java.util.HashMap;
import java.util.Map;

public class CustomerIdRangePartitioner implements Partitioner {

    private final int minId;
    private final int maxId;
    private final int gridSize;

    public CustomerIdRangePartitioner(int minId, int maxId, int gridSize) {
        this.minId = minId;
        this.maxId = maxId;
        this.gridSize = gridSize;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int number = (maxId - minId) / this.gridSize + 1;

        Map<String, ExecutionContext> result = new HashMap<>();
        for (int i = 0; i < number; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            int start = minId + (this.gridSize * i);
            int end = start + (this.gridSize * (i + 1));
            executionContext.putInt("minValue", start);
            executionContext.putInt("maxValue", Math.min(end, maxId));
            result.put("partition" + i, executionContext);
        }

        return result;
    }
}

jublistener

package com.example.batchdemo.job;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;

@Component
@JobScope
public class JobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {

    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobId() + " was finished: " + jobExecution.getStatus());
    }

}

appconfiguration

package com.example.batchdemo.controller;

import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class AppConfiguration {

    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final JobRegistry jobRegistry;
    private final ApplicationContext applicationContext;

    public AppConfiguration(JobExplorer jobExplorer, JobRepository jobRepository, JobRegistry jobRegistry, ApplicationContext applicationContext) {
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.jobRegistry = jobRegistry;
        this.applicationContext = applicationContext;
    }

    @Bean
    public synchronized JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
        JobRegistryBeanPostProcessor registrar = new JobRegistryBeanPostProcessor();

        registrar.setJobRegistry(jobRegistry);
        registrar.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        registrar.afterPropertiesSet();

        return registrar;
    }

    @Bean
    public JobOperator jobOperator() throws Exception {
        SimpleJobOperator simpleJobOperator = new SimpleJobOperator();

        simpleJobOperator.setJobLauncher(getJobLauncher());
        simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
        simpleJobOperator.setJobRepository(this.jobRepository);
        simpleJobOperator.setJobExplorer(this.jobExplorer);
        simpleJobOperator.setJobRegistry(this.jobRegistry);

        simpleJobOperator.afterPropertiesSet();

        return simpleJobOperator;
    }

    @Bean
    public JobLauncher getJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = null;
        jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(jobOperatorExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public ThreadPoolTaskExecutor jobOperatorExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(64);
        threadPoolTaskExecutor.setMaxPoolSize(256);
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>batch-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>batch-demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

I'm using spring-batch along with spring-boot 2.5.6. I decided to use remote-partitioning with Kafka as the middleware. I have one manager and three workers. accordingly, one partition has been assigned for the manager's input topic and three partitions have been assigned for the worker's input.

the manager takes a file, creates multiples ExecutionContexts and sends those over Kafka. workers start processing the respective steps and send the message at the end of their process. manager will aggregate the worker's results and decide to complete the job if all workers are done. so far so good.

now assume first I run a long-running job that requires lots of time to finish and then I run a small job that finishes quickly. not surprisingly the second job finishes sooner and sends a completed signal, the manager consumes this message and continues the process. I even checked AggregatingMessageHandler, the completed message is related to the second job (short-running one) only, I checked the jobExecutionId

now the problem happens, I have a JobListener that has an afterJob method. this method will be run against the first job (the long-running one that is still being processed by workers), not the second one (the short-running one that a completed signal has been sent for it)! I can say this by looking at the jobExecutionId. it's really weird because I never saw in the logs that there's a completion signal for the first job.

after some time and whenever the first long-running job is finished, the final worker sends a completed message and the manager decides to finish the job, now the JobListener is run against the second job (short-running one)!

I couldn't understand what goes wrong? I would like to assume that probably it's a miss-configuration, but by debugging the code and checking AggregatingMessageHandler and TRACE logs in the workers and manager, I can clearly see that the messages are being sent fine and there's nothing wrong with the messages. any suggestions/ideas are welcome.

UPDATE

here is a sample implementation: let's say we have a Customer table.
the job takes minId and maxId (ID column in Customer table is a simple number) then the manager creates multiple ExecutionContexts based on the ids range.

manager config

package com.example.batchdemo.job;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;

@Profile("!worker")
@Configuration
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final JobExplorer jobExplorer;
    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
    private final JobListener jobListener;

    public JobConfiguration(JobBuilderFactory jobBuilderFactory, JobExplorer jobExplorer, RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory, JobListener jobListener) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.jobExplorer = jobExplorer;
        this.managerStepBuilderFactory = managerStepBuilderFactory;
        this.jobListener = jobListener;
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(managerStep())
                .listener(jobListener)
                .build();
    }

    @Bean
    public Step managerStep() {
        return managerStepBuilderFactory.get("managerStep")
                .partitioner("workerStep", rangePartitioner(null, null))
                .outputChannel(requestForWorkers())
                .inputChannel(repliesFromWorkers())
                .jobExplorer(jobExplorer)
                .build();
    }

    @Bean
    @StepScope
    public Partitioner rangePartitioner(@Value("#{jobParameters['minId']}") Integer minId, @Value("#{jobParameters['maxId']}") Integer maxId) {
        return new CustomerIdRangePartitioner(minId, maxId);
    }


    ////////////////////////////////////////////////////////////////////////////////////////////////

    @Bean
    public DirectChannel requestForWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
        return IntegrationFlows
                .from(requestForWorkers())
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("requestForWorkers"))
                .route("requestForWorkers")
                .get();
    }

    @Bean
    public DirectChannel repliesFromWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("repliesFromWorkers")))
                .channel(repliesFromWorkers())
                .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

}

worker config

package com.example.batchdemo.job;

import com.example.batchdemo.domain.Customer;
import com.example.batchdemo.domain.CustomerRowMapper;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@Profile("worker")
public class WorkerConfiguration {

    private static final int CHUNK_SIZE = 10;
    private static final int WAITING_TIME = 3000;

    public final DataSource dataSource;
    private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

    public WorkerConfiguration(DataSource dataSource, RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
        this.dataSource = dataSource;
        this.workerStepBuilderFactory = workerStepBuilderFactory;
    }

    @Bean
    public DirectChannel repliesFromWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
        return IntegrationFlows
                .from(repliesFromWorkers())
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("repliesFromWorkers"))
                .route("repliesFromWorkers")
                .get();
    }

    @Bean
    public DirectChannel requestForWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
                .channel(requestForWorkers())
                .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

    /////////////////////////////////////////////////////////////////
    /////////////////////////////////////////////////////////////////

    @Bean
    public Step workerStep() {
        SimpleStepBuilder workerStepBuilder = workerStepBuilderFactory.get("workerStep")
                .inputChannel(requestForWorkers())
                .outputChannel(repliesFromWorkers())
                .<Customer, Customer>chunk(CHUNK_SIZE)
                .reader(pagingItemReader(null, null))
                .processor(itemProcessor())
                .writer(customerItemWriter());
        return workerStepBuilder.build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
                                                           @Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
        System.out.println("reading " + minValue + " to " + maxValue);
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setFetchSize(1000);
        reader.setRowMapper(new CustomerRowMapper());

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from CUSTOMER");
        queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

    @Bean
    @StepScope
    public ItemProcessor<Customer, Customer> itemProcessor() {
        return item -> {
            Thread.sleep(WAITING_TIME);
            System.out.println(item);
            return item;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Customer> customerItemWriter() {
        return items -> {
            System.out.printf("%d items were written%n", items.size());
        };
    }

}

Partitioner:

package com.example.batchdemo.job;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

import java.util.HashMap;
import java.util.Map;

public class CustomerIdRangePartitioner implements Partitioner {

    private final int minId;
    private final int maxId;
    private final int gridSize;

    public CustomerIdRangePartitioner(int minId, int maxId, int gridSize) {
        this.minId = minId;
        this.maxId = maxId;
        this.gridSize = gridSize;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int number = (maxId - minId) / this.gridSize + 1;

        Map<String, ExecutionContext> result = new HashMap<>();
        for (int i = 0; i < number; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            int start = minId + (this.gridSize * i);
            int end = start + (this.gridSize * (i + 1));
            executionContext.putInt("minValue", start);
            executionContext.putInt("maxValue", Math.min(end, maxId));
            result.put("partition" + i, executionContext);
        }

        return result;
    }
}

JobListener

package com.example.batchdemo.job;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;

@Component
@JobScope
public class JobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {

    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobId() + " was finished: " + jobExecution.getStatus());
    }

}

AppConfiguration

package com.example.batchdemo.controller;

import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class AppConfiguration {

    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final JobRegistry jobRegistry;
    private final ApplicationContext applicationContext;

    public AppConfiguration(JobExplorer jobExplorer, JobRepository jobRepository, JobRegistry jobRegistry, ApplicationContext applicationContext) {
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.jobRegistry = jobRegistry;
        this.applicationContext = applicationContext;
    }

    @Bean
    public synchronized JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
        JobRegistryBeanPostProcessor registrar = new JobRegistryBeanPostProcessor();

        registrar.setJobRegistry(jobRegistry);
        registrar.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        registrar.afterPropertiesSet();

        return registrar;
    }

    @Bean
    public JobOperator jobOperator() throws Exception {
        SimpleJobOperator simpleJobOperator = new SimpleJobOperator();

        simpleJobOperator.setJobLauncher(getJobLauncher());
        simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
        simpleJobOperator.setJobRepository(this.jobRepository);
        simpleJobOperator.setJobExplorer(this.jobExplorer);
        simpleJobOperator.setJobRegistry(this.jobRegistry);

        simpleJobOperator.afterPropertiesSet();

        return simpleJobOperator;
    }

    @Bean
    public JobLauncher getJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = null;
        jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(jobOperatorExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public ThreadPoolTaskExecutor jobOperatorExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(64);
        threadPoolTaskExecutor.setMaxPoolSize(256);
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>batch-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>batch-demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

通知家属抬走 2025-02-14 08:09:36

这是春季批处理中的错误。确实,侦听器被要求使用错误的 jobexectution 实例完成的作业。制作 JobExeCutionListener 作业分割并不能解决问题。

我将重新打开问题在Github上 在Github上进行进一步调查。

This is a bug in Spring Batch. The listener is indeed called for the job that finishes earlier with the wrong JobExecution instance. Making the JobExecutionListener job-scoped does not solve the issue.

I will re-open the issue on Github for further investigation.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文