Spring 集成和批处理作业在每个轮询中运行

发布于 2025-01-10 02:29:23 字数 2850 浏览 2 评论 0原文

我不明白 Spring 与 JobLaunchingGateway 行为的集成。我有示例配置:

    public SftpInboundChannelAdapterSpec sftpInboundChannelAdapterSpec() {
        return Sftp.inboundAdapter(ftpFileSessionFactory())
                .preserveTimestamp(true)
                .deleteRemoteFiles(false)
                .remoteDirectory(integrationProperties.getRemoteDirectory())
                .filter(sftpFileListFilter())
                .localDirectory(new File(integrationProperties.getLocalDirectory()));
    }

    public PollerSpec pollerSpec() {
        PollerSpec cron = Pollers.cron(integrationProperties.getPollerCron());
        cron.maxMessagesPerPoll(integrationProperties.getMessagePerPoll());

        return cron;
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .handle(message -> {
                    logger.info("Handle message: {}", message.getPayload());
                })
                .get();
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        return jobLaunchingGateway;
    }

    private ChainFileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
        ChainFileListFilter<ChannelSftp.LsEntry> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
        chainFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(metadataStore(), "INT"));
        return chainFileListFilter;
    }

如果我设置每 1 分钟轮询一次,则每分钟都会创建该作业。我在 MetaDataStore 中没有看到任何新记录。

当我用 .handle(jobLaunchingGateway()) 注释行时,

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
                .transform(fileMessageToJobRequest())
//                .handle(jobLaunchingGateway())
                .handle(message -> {
                    logger.info("Handle message: {}", message.getPayload());
                })
                .get();
    }

一切都按预期进行。

我期望 SFTP 获取新文件,然后为每个文件创建新作业。

我不明白为什么启用 JobLaunchingGateway 后在 MetaDataStore 中看不到记录。

你能帮我解释一下吗?

我是 Spring 集成和批处理的新手?

I don't understand spring integration with JobLaunchingGateway behavior. I have example config:

    public SftpInboundChannelAdapterSpec sftpInboundChannelAdapterSpec() {
        return Sftp.inboundAdapter(ftpFileSessionFactory())
                .preserveTimestamp(true)
                .deleteRemoteFiles(false)
                .remoteDirectory(integrationProperties.getRemoteDirectory())
                .filter(sftpFileListFilter())
                .localDirectory(new File(integrationProperties.getLocalDirectory()));
    }

    public PollerSpec pollerSpec() {
        PollerSpec cron = Pollers.cron(integrationProperties.getPollerCron());
        cron.maxMessagesPerPoll(integrationProperties.getMessagePerPoll());

        return cron;
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .handle(message -> {
                    logger.info("Handle message: {}", message.getPayload());
                })
                .get();
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        return jobLaunchingGateway;
    }

    private ChainFileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
        ChainFileListFilter<ChannelSftp.LsEntry> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
        chainFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(metadataStore(), "INT"));
        return chainFileListFilter;
    }

If I set polling every 1 minute, the job will be created every minute. I don't see any new record in MetaDataStore.

When I comment line with .handle(jobLaunchingGateway())

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
                .transform(fileMessageToJobRequest())
//                .handle(jobLaunchingGateway())
                .handle(message -> {
                    logger.info("Handle message: {}", message.getPayload());
                })
                .get();
    }

Everything works as expected.

I expected that SFTP fetch new file(s) and then create new job for each file.

I don't understand why I don't see records in MetaDataStore when I JobLaunchingGateway is enabled.

Can you help me and explain this?

I am new in spring integration and batch?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文