将Spring Integration SftpinBoundChannel与Spring Batch结合使用

发布于 2025-01-28 01:02:13 字数 1571 浏览 5 评论 0原文

问题描述

亲爱的堆栈跨流社区, 最近,我遇到了春季融合框架的Sftpinbound频道。

是否可以通过批处理作业启动入站通道?

到现在为止,我的选拔器

已经创建了一个RunonCetrigger,并尝试通过以下代码启动通过Job-Request启动其入站通道:

    @Bean
  @InboundChannelAdapter(channel = "fromSftpChannel",poller = @Poller(trigger = "fireSftpOnceTrigger"),autoStartup =
    "false")
  //@InboundChannelAdapter(channel = "fromSftpChannel", poller = @Poller(fixedRate = "1000"))
  public MessageSource<InputStream> sftpMessageSource() {
    SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
    messageSource.setRemoteDirectory(sftpRemoteDirectoryDownload);
    messageSource.setFilter(new AcceptAllFileListFilter<>());
    messageSource.setMaxFetchSize(1);
    return messageSource;
  }
@Bean
  public IntegrationFlow flow() {
    return IntegrationFlows.from(Sftp.inboundStreamingAdapter(template())
          .filter(new AcceptAllFileListFilter<>())
          .remoteDirectory(sftpRemoteDirectoryDownload),
          e -> e.id("sftpinboundpoller").autoStartup(false).poller(Pollers.trigger(fireSftpOnceTrigger)))
      .channel(MessageChannels.queue("files"))
      .get();
  }

最后,我使用此代码触发了我的SftpPoller,但似乎只是触发SFTP,但没有触发文件阅读, -工作。

 SourcePollingChannelAdapterFactoryBean factoryBean =
      (SourcePollingChannelAdapterFactoryBean)flow().getIntegrationComponents().keySet().stream().findFirst().get();

    fireSftpOnceTrigger.reset();
    factoryBean.start();

也许有人知道通过批处理工作启动SFTP入站频道的更好方法?

最好的问候

Devron1705

Problem description

Dear Stack-Overflow-Community,
I recently came across sftpinbound channel from Spring-Integration-Framework.

Is is possible to start the inbound channel by batch job or not?

My tryouts until now

I already created a RunOnceTrigger and try to start its inbound-channel by job-request with the following code:

    @Bean
  @InboundChannelAdapter(channel = "fromSftpChannel",poller = @Poller(trigger = "fireSftpOnceTrigger"),autoStartup =
    "false")
  //@InboundChannelAdapter(channel = "fromSftpChannel", poller = @Poller(fixedRate = "1000"))
  public MessageSource<InputStream> sftpMessageSource() {
    SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
    messageSource.setRemoteDirectory(sftpRemoteDirectoryDownload);
    messageSource.setFilter(new AcceptAllFileListFilter<>());
    messageSource.setMaxFetchSize(1);
    return messageSource;
  }
@Bean
  public IntegrationFlow flow() {
    return IntegrationFlows.from(Sftp.inboundStreamingAdapter(template())
          .filter(new AcceptAllFileListFilter<>())
          .remoteDirectory(sftpRemoteDirectoryDownload),
          e -> e.id("sftpinboundpoller").autoStartup(false).poller(Pollers.trigger(fireSftpOnceTrigger)))
      .channel(MessageChannels.queue("files"))
      .get();
  }

And finally I use this code to trigger my sftppoller but it seems only to trigger the sftp but not the file-read-job.

 SourcePollingChannelAdapterFactoryBean factoryBean =
      (SourcePollingChannelAdapterFactoryBean)flow().getIntegrationComponents().keySet().stream().findFirst().get();

    fireSftpOnceTrigger.reset();
    factoryBean.start();

Maybe someone is knowing a better way to start a sftp inbound channel by an batch job?

Best regards

Devron1705

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

|煩躁 2025-02-04 01:02:13

查看我的错误

  @Bean
  public IntegrationFlow flow() {
    return IntegrationFlows.from(Sftp.inboundStreamingAdapter(template())
          .filter(new AcceptAllFileListFilter<>())
          .remoteDirectory(sftpRemoteDirectoryDownload),
          e -> e.id("sftpinboundpoller").autoStartup(false).poller(Pollers.trigger(fireSftpOnceTrigger)))
      **.channel(MessageChannels.queue("files"))**
      .get();
  }

.Channel(MessageChannels.Queue(“ files”)) - &gt;意味着我的消息。我创建了一个名为FrofSFTP的频道,该频道将将传入的消息转换为文件流。可以通过将我以前的代码线更改为 .Channel(MessageChannels.Direct(“ FromSFTP”))及其完成来安排连接。

Looking at my errors

  @Bean
  public IntegrationFlow flow() {
    return IntegrationFlows.from(Sftp.inboundStreamingAdapter(template())
          .filter(new AcceptAllFileListFilter<>())
          .remoteDirectory(sftpRemoteDirectoryDownload),
          e -> e.id("sftpinboundpoller").autoStartup(false).poller(Pollers.trigger(fireSftpOnceTrigger)))
      **.channel(MessageChannels.queue("files"))**
      .get();
  }

.channel(MessageChannels.queue("files")) --> means queing my messages. I've created an channel called fromSftp that will transform the incoming-messages to an file-stream. Connection could be arranged by changing my previous codeline to .channel(MessageChannels.direct("fromSftp")) and its done.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文