如何修复errorcallbacknotimplement-> comcellationException:在反应性爪哇中处置?

发布于 2025-01-21 22:03:16 字数 1976 浏览 0 评论 0 原文

  1. 我正在进行测试,以了解如何发送通知。这是 已发送,我获得了测试结果状态:status_ok
  2. gatewayrsocketclient通过protobuff rpc方法创建的
@Test
public void sendRequestToService() {
    
    RSocket socket = RSocketConnector
        .connectWith(TcpClientTransport.create("127.0.0.1", 7576))
        .block();
    
    Mono<GatewayResponse> res = new GatewayRSocketClient(socket)
        .requestResponse(GatewayRequest.newBuilder()
        .setMethod(Method.METHOD_POST)
        .setUri("/notification/save-token")
        .putParams("token", "12345678")
        .build());
    
    System.out.println("STATUS: " + res.block().getStatus());    
}

这是要调用的方法,并进一步向下链条,直到将消息发送到firebase

@Override
public Mono<GatewayResponse> requestResponse(GatewayRequest message, ByteBuf metadata)
{
    return serv.sendMessage(new NotificationDTO("first notification", Event.CREDITING, "1234567"))
        .map(res -> GatewayResponse.newBuilder()
        .setStatus(res ? Status.STATUS_OK : Status.STATUS_NOT_FOUND)
        .build());
}

the firebase该测试成功运行,但我得到了很长的stacktrace。

我搜索了解决方案,没有成功。流动被抛出,显然是由于例外。 如何修复?

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
    at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:550) ~[rsocket-core-1.1.1.jar:na]
    at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]
    at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.1.jar:na]

问题在于插座在测试中停止,因为测试结束并且因此抛出了例外。问题是如何正确关闭套筒。我没有在

  1. I am running a test to see how the notification will be sent. It is
    sent and I get the test result STATUS: STATUS_OK
  2. GatewayRSocketClient created via protobuff rpc method
@Test
public void sendRequestToService() {
    
    RSocket socket = RSocketConnector
        .connectWith(TcpClientTransport.create("127.0.0.1", 7576))
        .block();
    
    Mono<GatewayResponse> res = new GatewayRSocketClient(socket)
        .requestResponse(GatewayRequest.newBuilder()
        .setMethod(Method.METHOD_POST)
        .setUri("/notification/save-token")
        .putParams("token", "12345678")
        .build());
    
    System.out.println("STATUS: " + res.block().getStatus());    
}

This is the method to be called and further down the chain until the message is sent to firebase

@Override
public Mono<GatewayResponse> requestResponse(GatewayRequest message, ByteBuf metadata)
{
    return serv.sendMessage(new NotificationDTO("first notification", Event.CREDITING, "1234567"))
        .map(res -> GatewayResponse.newBuilder()
        .setStatus(res ? Status.STATUS_OK : Status.STATUS_NOT_FOUND)
        .build());
}

The test is running successfully but I am getting a long stacktrace.

I have searched for solutions and without success. The flow is thrown and apparently because of the exception.
How can this be fixed?

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
    at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:550) ~[rsocket-core-1.1.1.jar:na]
    at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]
    at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.1.jar:na]

The problem is that the socket stops in the test, because the test ends and an exception is thrown because of this. The question is how to close the socket correctly. I don't see suitable methods on RSocket.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文