Spring 集成和批处理作业在每个轮询中运行
我不明白 Spring 与 JobLaunchingGateway 行为的集成。我有示例配置:
public SftpInboundChannelAdapterSpec sftpInboundChannelAdapterSpec() {
return Sftp.inboundAdapter(ftpFileSessionFactory())
.preserveTimestamp(true)
.deleteRemoteFiles(false)
.remoteDirectory(integrationProperties.getRemoteDirectory())
.filter(sftpFileListFilter())
.localDirectory(new File(integrationProperties.getLocalDirectory()));
}
public PollerSpec pollerSpec() {
PollerSpec cron = Pollers.cron(integrationProperties.getPollerCron());
cron.maxMessagesPerPoll(integrationProperties.getMessagePerPoll());
return cron;
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway())
.handle(message -> {
logger.info("Handle message: {}", message.getPayload());
})
.get();
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
private ChainFileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
ChainFileListFilter<ChannelSftp.LsEntry> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
chainFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(metadataStore(), "INT"));
return chainFileListFilter;
}
如果我设置每 1 分钟轮询一次,则每分钟都会创建该作业。我在 MetaDataStore
中没有看到任何新记录。
当我用 .handle(jobLaunchingGateway())
注释行时,
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
.transform(fileMessageToJobRequest())
// .handle(jobLaunchingGateway())
.handle(message -> {
logger.info("Handle message: {}", message.getPayload());
})
.get();
}
一切都按预期进行。
我期望 SFTP 获取新文件,然后为每个文件创建新作业。
我不明白为什么启用 JobLaunchingGateway
后在 MetaDataStore
中看不到记录。
你能帮我解释一下吗?
我是 Spring 集成和批处理的新手?
I don't understand spring integration with JobLaunchingGateway
behavior. I have example config:
public SftpInboundChannelAdapterSpec sftpInboundChannelAdapterSpec() {
return Sftp.inboundAdapter(ftpFileSessionFactory())
.preserveTimestamp(true)
.deleteRemoteFiles(false)
.remoteDirectory(integrationProperties.getRemoteDirectory())
.filter(sftpFileListFilter())
.localDirectory(new File(integrationProperties.getLocalDirectory()));
}
public PollerSpec pollerSpec() {
PollerSpec cron = Pollers.cron(integrationProperties.getPollerCron());
cron.maxMessagesPerPoll(integrationProperties.getMessagePerPoll());
return cron;
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway())
.handle(message -> {
logger.info("Handle message: {}", message.getPayload());
})
.get();
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
private ChainFileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
ChainFileListFilter<ChannelSftp.LsEntry> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
chainFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(metadataStore(), "INT"));
return chainFileListFilter;
}
If I set polling every 1 minute, the job will be created every minute. I don't see any new record in MetaDataStore
.
When I comment line with .handle(jobLaunchingGateway())
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
.transform(fileMessageToJobRequest())
// .handle(jobLaunchingGateway())
.handle(message -> {
logger.info("Handle message: {}", message.getPayload());
})
.get();
}
Everything works as expected.
I expected that SFTP fetch new file(s) and then create new job for each file.
I don't understand why I don't see records in MetaDataStore
when I JobLaunchingGateway
is enabled.
Can you help me and explain this?
I am new in spring integration and batch?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论