无法使用quartz运行多个作业

发布于 2024-08-15 17:49:12 字数 10560 浏览 8 评论 0原文

您好,我正在尝试使用批处理框架运行两个作业。 我的问题是 SimpleJobLauncher 仅运行一项作业,该作业位于作业列表中的最后一项。 这是我正在做的事情: 我的数据库中有两个作业以及作业的步骤。 我从数据库读取作业数据并按以下方式处理

它 公共类 BatchJobScheduler { 私有静态Log sLog = LogFactory.getLog(BatchJobScheduler.class); 私有ApplicationContext ac; 私有DataSourceTransactionManager mTransactionManager; 私有 SimpleJobLauncher mJobLauncher; 私有 JobRepository mJobRepository; 私有 SimpleStepFactoryBean 步骤工厂; 私人 MapJobRegistry mapJobRegistry; 私有JobDetailBean jobDetail; 私有 CronTriggerBean cronTrigger; 私有 SimpleJob 作业; 私有SchedulerFactoryBean调度器工厂; 私有静态字符串mDriverClass; 私有静态字符串mConnectionUrl; 私有静态字符串mUser; 私有静态字符串mPassword; 公共静态JobMetaDataFeeder元数据Feeder; 静止的 { 尝试 { 加载属性(); metadataFeeder = new JobMetaDataFeeder(); metadataFeeder.configureDataSource(mDriverClass, mConnectionUrl, m用户,m密码); } catch (FileNotFoundException e) { } catch (IOException e) { } catch (SQLException e) { } catch (ClassNotFoundException e) { } }

private static void loadProperties() throws FileNotFoundException,
        IOException {
    Properties properties = new Properties();
    InputStream is;
    if (BatchJobScheduler.class.getClassLoader() != null) {
        is = BatchJobScheduler.class.getClassLoader().getResourceAsStream(
                "batch.properties");
    } else {
        is = System.class.getClassLoader().getResourceAsStream(
                "batch.properties");
    }
    properties.load(is);
    mDriverClass = properties.getProperty("batch.jdbc.driver");
    mConnectionUrl = properties.getProperty("batch.jdbc.url");
    mUser = properties.getProperty("batch.jdbc.user");
    mPassword = properties.getProperty("batch.jdbc.password");
}

public void start(WebApplicationContext wac) throws Exception {
    try {
        ac = new FileSystemXmlApplicationContext("batch-spring.xml");
        mTransactionManager = (DataSourceTransactionManager) ac
                .getBean("mTransactionManager");
        mJobLauncher = (SimpleJobLauncher) ac.getBean("mJobLauncher");
        mJobRepository = (JobRepository) ac.getBean("mRepositoryFactory");
        mJobLauncher.afterPropertiesSet();
        List<JobMetadata> jobsMetaData = getJobsData(mDriverClass,
                mConnectionUrl, mUser, mPassword, null);
        createAndRunScheduler(jobsMetaData);
    } catch (Exception e) {
        e.printStackTrace();
        sLog.error("Exception while starting job", e);
    }
}

@SuppressWarnings("unchecked")
public List<CronTriggerBean> getJobTriggers(List<JobMetadata> jobsMetaData)
        throws Exception {
    List<CronTriggerBean> triggers = new ArrayList<CronTriggerBean>();
    for (JobMetadata jobMetadata : jobsMetaData) {
        job = (SimpleJob) ac.getBean("job");
        job.setName(jobMetadata.getJobName());
        ArrayList<Step> steps = new ArrayList<Step>();
        for (StepMetadata stepMetadata : jobMetadata.getSteps()) {
            // System.err.println(ac.getBean("stepFactory").getClass());
            stepFactory = new SimpleStepFactoryBean<String, Object>();
            stepFactory.setTransactionManager(mTransactionManager);
            stepFactory.setJobRepository(mJobRepository);
            stepFactory.setCommitInterval(stepMetadata.getCommitInterval());
            stepFactory.setStartLimit(stepMetadata.getStartLimit());
            T5CItemReader itemReader = (T5CItemReader) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepReaderClass()));
            itemReader
                    .setItems(getItemList(jobMetadata.getJobParameters()));
            stepFactory.setItemReader(itemReader);
            stepFactory.setItemProcessor((ItemProcessor) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepProcessorClass())));
            stepFactory.setItemWriter((ItemWriter) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepWriterClass())));
            stepFactory.setBeanName(stepMetadata.getStepName());
            steps.add((Step) stepFactory.getObject());
        }
        job.setSteps(steps);
        ReferenceJobFactory jobFactory = new ReferenceJobFactory(job);
        mapJobRegistry = (MapJobRegistry) ac.getBean("jobRegistry");
        mapJobRegistry.register(jobFactory);
        jobDetail = (JobDetailBean) ac.getBean("jobDetail");
        jobDetail.setJobClass(Class.forName(jobMetadata.getMJoblauncher()));
        jobDetail.setGroup(jobMetadata.getJobGroupName());
        jobDetail.setName(jobMetadata.getJobName());
        Map<String, Object> jobDataMap = new HashMap<String, Object>();
        jobDataMap.put("jobName", jobMetadata.getJobName());
        jobDataMap.put("jobLocator", mapJobRegistry);
        jobDataMap.put("jobLauncher", mJobLauncher);
        jobDataMap.put("timestamp", new Date());
        // jobDataMap.put("jobParams", jobMetadata.getJobParameters());
        jobDetail.setJobDataAsMap(jobDataMap);
        jobDetail.afterPropertiesSet();
        cronTrigger = (CronTriggerBean) ac.getBean("cronTrigger");
        cronTrigger.setJobDetail(jobDetail);
        cronTrigger.setJobName(jobMetadata.getJobName());
        cronTrigger.setJobGroup(jobMetadata.getJobGroupName());
        cronTrigger.setCronExpression(jobMetadata.getCronExpression());
        triggers.add(cronTrigger);
    }
    return triggers;
}

private void createAndRunScheduler(List<JobMetadata> jobsMetaData)
        throws Exception {
    // System.err.println(ac.getBean("schedulerFactory").getClass());
    schedulerFactory = new SchedulerFactoryBean();
    List<CronTriggerBean> triggerList = getJobTriggers(jobsMetaData);
    Trigger[] triggers = new Trigger[triggerList.size()];
    int triggerCount = 0;
    for (CronTriggerBean trigger : triggerList) {
        triggers[triggerCount] = trigger;
        triggerCount++;
    }
    schedulerFactory.setTriggers(triggers);
    schedulerFactory.afterPropertiesSet();
}

private List<JobMetadata> getJobsData(String driverClass,
        String connectionURL, String user, String password, String query)
        throws SQLException, ClassNotFoundException {
    metadataFeeder.createJobMetadata(query);
    return metadataFeeder.getJobsMetadata();
}

private List<String> getItemList(String jobParameterString) {
    List<String> itemList = new ArrayList<String>();
    String[] parameters = jobParameterString.split(";");
    for (String string : parameters) {
        String[] mapKeyValue = string.split("=");
        if (mapKeyValue.length == 2) {
            itemList.add(mapKeyValue[0] + ":" + mapKeyValue[1]);
        } else {
            // exception for invalid job parameters
            System.out.println("exception for invalid job parameters");
        }
    }
    return itemList;
}

private Map<String, Object> getParameterMap(String jobParameterString) {
    Map<String, Object> parameterMap = new HashMap<String, Object>();
    String[] parameters = jobParameterString.split(";");
    for (String string : parameters) {
        String[] mapKeyValue = string.split("=");
        if (mapKeyValue.length == 2) {
            parameterMap.put(mapKeyValue[0], mapKeyValue[1]);
        } else {
            // exception for invalid job parameters
            System.out.println("exception for invalid job parameters");
        }
    }
    return parameterMap;
}

}

公共类 MailJobLauncher 扩展 QuartzJobBean { /** * 作业数据映射中的特殊键,用于指定要运行的作业的名称。 */ 静态最终字符串JOB_NAME =“作业名称”; 私有静态Log sLog = LogFactory.getLog(MailJobLauncher.class); 私有 JobLocator mJobLocator; 私人JobLauncher mJobLauncher;

/**
 * Public setter for the {@link JobLocator}.
 * 
 * @param jobLocator
 *            the {@link JobLocator} to set
 */
public void setJobLocator(JobLocator jobLocator) {
    this.mJobLocator = jobLocator;
}

/**
 * Public setter for the {@link JobLauncher}.
 * 
 * @param jobLauncher
 *            the {@link JobLauncher} to set
 */
public void setJobLauncher(JobLauncher jobLauncher) {
    this.mJobLauncher = jobLauncher;
}

@Override
@SuppressWarnings("unchecked")
protected void executeInternal(JobExecutionContext context) {
    Map<String, Object> jobDataMap = context.getMergedJobDataMap();
    executeRecursive(jobDataMap);
}

private void executeRecursive(Map<String, Object> jobDataMap) {
    String jobName = (String) jobDataMap.get(JOB_NAME);
    JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
    sLog.info("Quartz trigger firing with Spring Batch jobName=" + jobName
            + jobDataMap + jobParameters);
    try {
        mJobLauncher.run(mJobLocator.getJob(jobName), jobParameters);
    } catch (JobInstanceAlreadyCompleteException e) {
        jobDataMap.remove("timestamp");
        jobDataMap.put("timestamp", new Date());
        executeRecursive(jobDataMap);
    } catch (NoSuchJobException e) {
        sLog.error("Could not find job.", e);
    } catch (JobExecutionException e) {
        sLog.error("Could not execute job.", e);
    }
}

/*
 * Copy parameters that are of the correct type over to {@link
 * JobParameters}, ignoring jobName.
 * @return a {@link JobParameters} instance
 */
private JobParameters getJobParametersFromJobMap(
        Map<String, Object> jobDataMap) {
    JobParametersBuilder builder = new JobParametersBuilder();
    for (Entry<String, Object> entry : jobDataMap.entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        if (value instanceof String && !key.equals(JOB_NAME)) {
            builder.addString(key, (String) value);
        } else if (value instanceof Float || value instanceof Double) {
            builder.addDouble(key, ((Number) value).doubleValue());
        } else if (value instanceof Integer || value instanceof Long) {
            builder.addLong(key, ((Number) value).longValue());
        } else if (value instanceof Date) {
            builder.addDate(key, (Date) value);
        } else {
            sLog
                    .debug("JobDataMap contains values which are not job parameters (ignoring).");
        }
    }
    return builder.toJobParameters();
}

<代码>} 我不明白为什么启动器会忽略所有其他工作,请帮助我。 问候

Hi i m trying to run two jobs using batch framework.
My problem is SimpleJobLauncher is running only one job which is last in the jobs list.
Here what i am doing:
I have two jobs in my database along with the steps for the jobs.
I read the job data from database and process it as following


public class BatchJobScheduler {
private static Log sLog = LogFactory.getLog(BatchJobScheduler.class);
private ApplicationContext ac;
private DataSourceTransactionManager mTransactionManager;
private SimpleJobLauncher mJobLauncher;
private JobRepository mJobRepository;
private SimpleStepFactoryBean stepFactory;
private MapJobRegistry mapJobRegistry;
private JobDetailBean jobDetail;
private CronTriggerBean cronTrigger;
private SimpleJob job;
private SchedulerFactoryBean schedulerFactory;
private static String mDriverClass;
private static String mConnectionUrl;
private static String mUser;
private static String mPassword;
public static JobMetaDataFeeder metadataFeeder;
static {
try {
loadProperties();
metadataFeeder = new JobMetaDataFeeder();
metadataFeeder.configureDataSource(mDriverClass, mConnectionUrl,
mUser, mPassword);
} catch (FileNotFoundException e) {
} catch (IOException e) {
} catch (SQLException e) {
} catch (ClassNotFoundException e) {
}
}

private static void loadProperties() throws FileNotFoundException,
        IOException {
    Properties properties = new Properties();
    InputStream is;
    if (BatchJobScheduler.class.getClassLoader() != null) {
        is = BatchJobScheduler.class.getClassLoader().getResourceAsStream(
                "batch.properties");
    } else {
        is = System.class.getClassLoader().getResourceAsStream(
                "batch.properties");
    }
    properties.load(is);
    mDriverClass = properties.getProperty("batch.jdbc.driver");
    mConnectionUrl = properties.getProperty("batch.jdbc.url");
    mUser = properties.getProperty("batch.jdbc.user");
    mPassword = properties.getProperty("batch.jdbc.password");
}

public void start(WebApplicationContext wac) throws Exception {
    try {
        ac = new FileSystemXmlApplicationContext("batch-spring.xml");
        mTransactionManager = (DataSourceTransactionManager) ac
                .getBean("mTransactionManager");
        mJobLauncher = (SimpleJobLauncher) ac.getBean("mJobLauncher");
        mJobRepository = (JobRepository) ac.getBean("mRepositoryFactory");
        mJobLauncher.afterPropertiesSet();
        List<JobMetadata> jobsMetaData = getJobsData(mDriverClass,
                mConnectionUrl, mUser, mPassword, null);
        createAndRunScheduler(jobsMetaData);
    } catch (Exception e) {
        e.printStackTrace();
        sLog.error("Exception while starting job", e);
    }
}

@SuppressWarnings("unchecked")
public List<CronTriggerBean> getJobTriggers(List<JobMetadata> jobsMetaData)
        throws Exception {
    List<CronTriggerBean> triggers = new ArrayList<CronTriggerBean>();
    for (JobMetadata jobMetadata : jobsMetaData) {
        job = (SimpleJob) ac.getBean("job");
        job.setName(jobMetadata.getJobName());
        ArrayList<Step> steps = new ArrayList<Step>();
        for (StepMetadata stepMetadata : jobMetadata.getSteps()) {
            // System.err.println(ac.getBean("stepFactory").getClass());
            stepFactory = new SimpleStepFactoryBean<String, Object>();
            stepFactory.setTransactionManager(mTransactionManager);
            stepFactory.setJobRepository(mJobRepository);
            stepFactory.setCommitInterval(stepMetadata.getCommitInterval());
            stepFactory.setStartLimit(stepMetadata.getStartLimit());
            T5CItemReader itemReader = (T5CItemReader) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepReaderClass()));
            itemReader
                    .setItems(getItemList(jobMetadata.getJobParameters()));
            stepFactory.setItemReader(itemReader);
            stepFactory.setItemProcessor((ItemProcessor) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepProcessorClass())));
            stepFactory.setItemWriter((ItemWriter) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepWriterClass())));
            stepFactory.setBeanName(stepMetadata.getStepName());
            steps.add((Step) stepFactory.getObject());
        }
        job.setSteps(steps);
        ReferenceJobFactory jobFactory = new ReferenceJobFactory(job);
        mapJobRegistry = (MapJobRegistry) ac.getBean("jobRegistry");
        mapJobRegistry.register(jobFactory);
        jobDetail = (JobDetailBean) ac.getBean("jobDetail");
        jobDetail.setJobClass(Class.forName(jobMetadata.getMJoblauncher()));
        jobDetail.setGroup(jobMetadata.getJobGroupName());
        jobDetail.setName(jobMetadata.getJobName());
        Map<String, Object> jobDataMap = new HashMap<String, Object>();
        jobDataMap.put("jobName", jobMetadata.getJobName());
        jobDataMap.put("jobLocator", mapJobRegistry);
        jobDataMap.put("jobLauncher", mJobLauncher);
        jobDataMap.put("timestamp", new Date());
        // jobDataMap.put("jobParams", jobMetadata.getJobParameters());
        jobDetail.setJobDataAsMap(jobDataMap);
        jobDetail.afterPropertiesSet();
        cronTrigger = (CronTriggerBean) ac.getBean("cronTrigger");
        cronTrigger.setJobDetail(jobDetail);
        cronTrigger.setJobName(jobMetadata.getJobName());
        cronTrigger.setJobGroup(jobMetadata.getJobGroupName());
        cronTrigger.setCronExpression(jobMetadata.getCronExpression());
        triggers.add(cronTrigger);
    }
    return triggers;
}

private void createAndRunScheduler(List<JobMetadata> jobsMetaData)
        throws Exception {
    // System.err.println(ac.getBean("schedulerFactory").getClass());
    schedulerFactory = new SchedulerFactoryBean();
    List<CronTriggerBean> triggerList = getJobTriggers(jobsMetaData);
    Trigger[] triggers = new Trigger[triggerList.size()];
    int triggerCount = 0;
    for (CronTriggerBean trigger : triggerList) {
        triggers[triggerCount] = trigger;
        triggerCount++;
    }
    schedulerFactory.setTriggers(triggers);
    schedulerFactory.afterPropertiesSet();
}

private List<JobMetadata> getJobsData(String driverClass,
        String connectionURL, String user, String password, String query)
        throws SQLException, ClassNotFoundException {
    metadataFeeder.createJobMetadata(query);
    return metadataFeeder.getJobsMetadata();
}

private List<String> getItemList(String jobParameterString) {
    List<String> itemList = new ArrayList<String>();
    String[] parameters = jobParameterString.split(";");
    for (String string : parameters) {
        String[] mapKeyValue = string.split("=");
        if (mapKeyValue.length == 2) {
            itemList.add(mapKeyValue[0] + ":" + mapKeyValue[1]);
        } else {
            // exception for invalid job parameters
            System.out.println("exception for invalid job parameters");
        }
    }
    return itemList;
}

private Map<String, Object> getParameterMap(String jobParameterString) {
    Map<String, Object> parameterMap = new HashMap<String, Object>();
    String[] parameters = jobParameterString.split(";");
    for (String string : parameters) {
        String[] mapKeyValue = string.split("=");
        if (mapKeyValue.length == 2) {
            parameterMap.put(mapKeyValue[0], mapKeyValue[1]);
        } else {
            // exception for invalid job parameters
            System.out.println("exception for invalid job parameters");
        }
    }
    return parameterMap;
}

}

public class MailJobLauncher extends QuartzJobBean {
/**
* Special key in job data map for the name of a job to run.
*/
static final String JOB_NAME = "jobName";
private static Log sLog = LogFactory.getLog(MailJobLauncher.class);
private JobLocator mJobLocator;
private JobLauncher mJobLauncher;

/**
 * Public setter for the {@link JobLocator}.
 * 
 * @param jobLocator
 *            the {@link JobLocator} to set
 */
public void setJobLocator(JobLocator jobLocator) {
    this.mJobLocator = jobLocator;
}

/**
 * Public setter for the {@link JobLauncher}.
 * 
 * @param jobLauncher
 *            the {@link JobLauncher} to set
 */
public void setJobLauncher(JobLauncher jobLauncher) {
    this.mJobLauncher = jobLauncher;
}

@Override
@SuppressWarnings("unchecked")
protected void executeInternal(JobExecutionContext context) {
    Map<String, Object> jobDataMap = context.getMergedJobDataMap();
    executeRecursive(jobDataMap);
}

private void executeRecursive(Map<String, Object> jobDataMap) {
    String jobName = (String) jobDataMap.get(JOB_NAME);
    JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
    sLog.info("Quartz trigger firing with Spring Batch jobName=" + jobName
            + jobDataMap + jobParameters);
    try {
        mJobLauncher.run(mJobLocator.getJob(jobName), jobParameters);
    } catch (JobInstanceAlreadyCompleteException e) {
        jobDataMap.remove("timestamp");
        jobDataMap.put("timestamp", new Date());
        executeRecursive(jobDataMap);
    } catch (NoSuchJobException e) {
        sLog.error("Could not find job.", e);
    } catch (JobExecutionException e) {
        sLog.error("Could not execute job.", e);
    }
}

/*
 * Copy parameters that are of the correct type over to {@link
 * JobParameters}, ignoring jobName.
 * @return a {@link JobParameters} instance
 */
private JobParameters getJobParametersFromJobMap(
        Map<String, Object> jobDataMap) {
    JobParametersBuilder builder = new JobParametersBuilder();
    for (Entry<String, Object> entry : jobDataMap.entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        if (value instanceof String && !key.equals(JOB_NAME)) {
            builder.addString(key, (String) value);
        } else if (value instanceof Float || value instanceof Double) {
            builder.addDouble(key, ((Number) value).doubleValue());
        } else if (value instanceof Integer || value instanceof Long) {
            builder.addLong(key, ((Number) value).longValue());
        } else if (value instanceof Date) {
            builder.addDate(key, (Date) value);
        } else {
            sLog
                    .debug("JobDataMap contains values which are not job parameters (ignoring).");
        }
    }
    return builder.toJobParameters();
}

}

I couldnt figure it out why launcher is ignoring all other jobs please help me.
Regards

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

乱了心跳 2024-08-22 17:49:12

确保设置了这些属性:

org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=3
org.quartz.threadPool.threadPriority=5

这将允许多个作业同时运行。根据需要调整设置。

Make sure these properties are set:

org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=3
org.quartz.threadPool.threadPriority=5

This will allow a few jobs to run at the same time. Adjust the settings as needed.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文