将Spring Integration SftpinBoundChannel与Spring Batch结合使用
问题描述
亲爱的堆栈跨流社区, 最近,我遇到了春季融合框架的Sftpinbound频道。
是否可以通过批处理作业启动入站通道?
到现在为止,我的选拔器
已经创建了一个RunonCetrigger,并尝试通过以下代码启动通过Job-Request启动其入站通道:
@Bean
@InboundChannelAdapter(channel = "fromSftpChannel",poller = @Poller(trigger = "fireSftpOnceTrigger"),autoStartup =
"false")
//@InboundChannelAdapter(channel = "fromSftpChannel", poller = @Poller(fixedRate = "1000"))
public MessageSource<InputStream> sftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory(sftpRemoteDirectoryDownload);
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(template())
.filter(new AcceptAllFileListFilter<>())
.remoteDirectory(sftpRemoteDirectoryDownload),
e -> e.id("sftpinboundpoller").autoStartup(false).poller(Pollers.trigger(fireSftpOnceTrigger)))
.channel(MessageChannels.queue("files"))
.get();
}
最后,我使用此代码触发了我的SftpPoller,但似乎只是触发SFTP,但没有触发文件阅读, -工作。
SourcePollingChannelAdapterFactoryBean factoryBean =
(SourcePollingChannelAdapterFactoryBean)flow().getIntegrationComponents().keySet().stream().findFirst().get();
fireSftpOnceTrigger.reset();
factoryBean.start();
也许有人知道通过批处理工作启动SFTP入站频道的更好方法?
最好的问候
Devron1705
Problem description
Dear Stack-Overflow-Community,
I recently came across sftpinbound channel from Spring-Integration-Framework.
Is is possible to start the inbound channel by batch job or not?
My tryouts until now
I already created a RunOnceTrigger and try to start its inbound-channel by job-request with the following code:
@Bean
@InboundChannelAdapter(channel = "fromSftpChannel",poller = @Poller(trigger = "fireSftpOnceTrigger"),autoStartup =
"false")
//@InboundChannelAdapter(channel = "fromSftpChannel", poller = @Poller(fixedRate = "1000"))
public MessageSource<InputStream> sftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory(sftpRemoteDirectoryDownload);
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(template())
.filter(new AcceptAllFileListFilter<>())
.remoteDirectory(sftpRemoteDirectoryDownload),
e -> e.id("sftpinboundpoller").autoStartup(false).poller(Pollers.trigger(fireSftpOnceTrigger)))
.channel(MessageChannels.queue("files"))
.get();
}
And finally I use this code to trigger my sftppoller but it seems only to trigger the sftp but not the file-read-job.
SourcePollingChannelAdapterFactoryBean factoryBean =
(SourcePollingChannelAdapterFactoryBean)flow().getIntegrationComponents().keySet().stream().findFirst().get();
fireSftpOnceTrigger.reset();
factoryBean.start();
Maybe someone is knowing a better way to start a sftp inbound channel by an batch job?
Best regards
Devron1705
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
查看我的错误
.Channel(MessageChannels.Queue(“ files”)) - &gt;意味着我的消息。我创建了一个名为FrofSFTP的频道,该频道将将传入的消息转换为文件流。可以通过将我以前的代码线更改为 .Channel(MessageChannels.Direct(“ FromSFTP”))及其完成来安排连接。
Looking at my errors
.channel(MessageChannels.queue("files")) --> means queing my messages. I've created an channel called fromSftp that will transform the incoming-messages to an file-stream. Connection could be arranged by changing my previous codeline to .channel(MessageChannels.direct("fromSftp")) and its done.