UNI等待VERTX EventBus消息

发布于 2025-01-27 05:49:08 字数 810 浏览 3 评论 0原文

我有两个端点:

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/waitForEvent")
    public Uni<Object> waitForEvent() {
        return Uni.createFrom().emitter(em -> {
            //wait for event from eventBus
    //            eventBus.consumer("test", msg -> {
    //                System.out.printf("receive event: %s\n", msg.body());
    //                em.complete(msg);
    //            });
        }).ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
    }
    
    @GET
    @Path("/send")
    public void test() {
        System.out.println("send event");
        eventBus.send("test", "send test event");
    }

wareforevent()仅在从EventBus接收事件的情况下才能完成。如何使用Vertx和Mutiny实现这一目标?

I have two endpoints:

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/waitForEvent")
    public Uni<Object> waitForEvent() {
        return Uni.createFrom().emitter(em -> {
            //wait for event from eventBus
    //            eventBus.consumer("test", msg -> {
    //                System.out.printf("receive event: %s\n", msg.body());
    //                em.complete(msg);
    //            });
        }).ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
    }
    
    @GET
    @Path("/send")
    public void test() {
        System.out.println("send event");
        eventBus.send("test", "send test event");
    }

The waitForEvent() should only complete if it receives the event from the eventBus. How can I achieve this using vertx and mutiny?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦忆晨望 2025-02-03 05:49:08

通常,我们避免使用这种模式,并使用事件总线中的请求/回复机制:

@GET
@Path("/send")
public Uni<String> test() {
   return bus.<String>request("test", name)        
        .onItem().transform(Message::body)
        .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}

使用两个端点实现(如问题)时,它可能会变得更加复杂,好像您对/wateforevent 端点,您需要确保每个“消费者”收到消息。

仍然有可能,但是将需要这样的东西:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<String> waitForEvent() {
  return Uni.createFrom().emitter(emitter -> {
    MessageConsumer<String> consumer = bus.consumer("test");
      consumer.handler(m -> {
      emitter.complete(m.body());
      consumer.unregisterAndForget();
   })
        .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));

}
    
@GET
@Path("/send")
public void test() {
  bus.publish("test", "send test event");
}

请确保使用io.vertx.mutiny.core.eventbus.eventbus.eventbus事件总线的变体。

In general, we avoid that kind of pattern and use the request/reply mechanism from the event bus:

@GET
@Path("/send")
public Uni<String> test() {
   return bus.<String>request("test", name)        
        .onItem().transform(Message::body)
        .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}

When implementing with two endpoints (as in the question), it can become a bit more complicated as if you have multiple calls to the /waitForEvent endpoint, you need to be sure that every "consumer" get the message.

It is still possible, but would will need something like this:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<String> waitForEvent() {
  return Uni.createFrom().emitter(emitter -> {
    MessageConsumer<String> consumer = bus.consumer("test");
      consumer.handler(m -> {
      emitter.complete(m.body());
      consumer.unregisterAndForget();
   })
        .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));

}
    
@GET
@Path("/send")
public void test() {
  bus.publish("test", "send test event");
}

Be sure to use the io.vertx.mutiny.core.eventbus.EventBus variant of the event bus.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文