Apache骆驼反应流抛出该流没有主动订阅

发布于 2025-02-11 23:43:49 字数 1381 浏览 0 评论 0原文

我只是尝试使用骆驼反应流与Spring Boot反应器一起使用以下代码

package com.manning.camel.reactive;

import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

/**
 * A simple Camel route that triggers from a timer and calls a bean and prints to system out.
 * <p/>
 * Use <tt>@Component</tt> to make Camel auto-detect this route when starting.
 */
@RestController
public class MySpringBootRouter extends RouteBuilder {

  @Autowired
  private ProducerTemplate template;

  @Autowired
  private CamelReactiveStreamsService crss;

  @GetMapping
  public Mono<String> sayHi() {
    template.asyncSendBody("direct:works", "Hi");
    //return  Mono.from(crss.fromStream("greet", String.class));
    return Mono.from(crss.fromStream("greet", String.class));
  }

  @Override
  public void configure() {
    from("direct:works")
            .log("Fired")
            .to("reactive-streams:greet");
  }

}

,然后运行代码

java.lang.illegalstateexception:该流没有活动订阅

I'm just trying use Camel Reactive Stream together with Spring Boot Reactor using the following code

package com.manning.camel.reactive;

import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

/**
 * A simple Camel route that triggers from a timer and calls a bean and prints to system out.
 * <p/>
 * Use <tt>@Component</tt> to make Camel auto-detect this route when starting.
 */
@RestController
public class MySpringBootRouter extends RouteBuilder {

  @Autowired
  private ProducerTemplate template;

  @Autowired
  private CamelReactiveStreamsService crss;

  @GetMapping
  public Mono<String> sayHi() {
    template.asyncSendBody("direct:works", "Hi");
    //return  Mono.from(crss.fromStream("greet", String.class));
    return Mono.from(crss.fromStream("greet", String.class));
  }

  @Override
  public void configure() {
    from("direct:works")
            .log("Fired")
            .to("reactive-streams:greet");
  }

}

After run the code

java.lang.IllegalStateException: The stream has no active subscriptions

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

扛刀软妹 2025-02-18 23:43:49

很长一段时间后,解决了错误,可以注意到路由器类逻辑有点更改了

@Slf4j
@Service
@AllArgsConstructor
public class MyService {
   final CamelContext context;

   @PostConstruct
   public void consumerData() {
     var rCamel = CamelReactiveStreams.get(context);
     var numbers = rCamel.fromStream("numbers", Integer.class);
     Flux.from(numbers).subscribe(e -> log.info("{}", e));
   }
}

@Component
@NoArgsConstructor
public class MyRouter extends RouteBuilder {


// Injects the Subscriber
@Autowired MyService service;

@Override
public void configure() {

  //onException(ReactiveStreamsNoActiveSubscriptionsException.class)
  //      .continued(true);

  from("timer://reactiveApp?fixedRate=true&period=2s")
        .transform(method(Random.class, "nextInt(100)"))
        //.log("${body}");
        .to("direct:message");

  from("direct:message")
        //.log("${body}")
        .to("reactive-streams:numbers");
  }
}

After a long time, solved the error, as can be noticed the Router Class logic was changed a little

@Slf4j
@Service
@AllArgsConstructor
public class MyService {
   final CamelContext context;

   @PostConstruct
   public void consumerData() {
     var rCamel = CamelReactiveStreams.get(context);
     var numbers = rCamel.fromStream("numbers", Integer.class);
     Flux.from(numbers).subscribe(e -> log.info("{}", e));
   }
}

@Component
@NoArgsConstructor
public class MyRouter extends RouteBuilder {


// Injects the Subscriber
@Autowired MyService service;

@Override
public void configure() {

  //onException(ReactiveStreamsNoActiveSubscriptionsException.class)
  //      .continued(true);

  from("timer://reactiveApp?fixedRate=true&period=2s")
        .transform(method(Random.class, "nextInt(100)"))
        //.log("${body}");
        .to("direct:message");

  from("direct:message")
        //.log("${body}")
        .to("reactive-streams:numbers");
  }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文