BlazeDS+ActiveMQ:Flex 客户端与持久主题的非正常断开不会将其从 ActiveMQ 中删除
我正在尝试使用 BlazeDS 的 JMS 桥让基于 Flex 的桌面应用程序通过持久订阅使用来自 ActiveMQ 主题的消息。基本场景如下:
消息是由Flex客户端订阅的主题中的其他生产者生产的。
Flex 客户端可能会不时地离线,但当它再次连接到 BlazeDS 时,它必须收到离线时错过的所有消息。 (当然Flex客户端每次都使用相同的客户端ID连接)。
不能保证 Flex 客户端正常关闭。
不能保证 Flex 客户端正常关闭。
如果我通过调用 disconnect() 显式断开 Flex 端的消费者,一切都会正常工作 - 我在应用程序的退出处理程序中执行此操作。但是,由于上面的#3,不能保证 disconnect()
始终被调用。当 Flex 客户端在没有调用 disconnect()
的情况下关闭时,BlazeDS 创建并关联到 Flex 客户端的“代理 JMS 客户端”的订阅似乎对 ActiveMQ 保持活动状态,因此 ActiveMQ 仍然认为当Flex应用程序下次启动时,它无法登录BlazeDS,因为ActiveMQ拒绝其订阅,声称客户端ID已被占用。为什么会这样?我可以做什么来确保当真正的 Flex 对应项意外终止时,BlazeDS 使 ActiveMQ 中的“代理 JMS 客户端”离线?
更详细的信息:一些调试显示:
BlazeDS 意识到 Flex 客户端的终止,因为它在调试模式下向控制台打印一些异常。消息如下:
[BlazeDS]23:18:13.688 [警告] ID 为“my-streaming-amf”的端点正在关闭与 ID 为“71E6466F-D91F-201C-F60A-A6CB52F95D9F”的 FlexClient 的流连接,因为端点遇到了套接字写入错误,可能是由于 FlexClient 无响应造成的。 ClientAbortException:java.net.SocketException:管道损坏 在 org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:319) 在 org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288) 在 org.apache.catalina.connector.Response.flushBuffer(Response.java:542) 在 org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:279) 在flex.messaging.endpoints.BaseStreamingHTTPEndpoint.handleFlexClientStreamingOpenRequest(BaseStreamingHTTPEndpoint.java:818) 在flex.messaging.endpoints.BaseStreamingHTTPEndpoint.serviceStreamingRequest(BaseStreamingHTTPEndpoint.java:1055) 在flex.messaging.endpoints.BaseStreamingHTTPEndpoint.service(BaseStreamingHTTPEndpoint.java:460) 在flex.messaging.MessageBrokerServlet.service(MessageBrokerServlet.java:353) 在 javax.servlet.http.HttpServlet.service(HttpServlet.java:803) 在 org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290) 在 org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 在org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233) 在org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:175) 在 org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:128) 在 org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102) 在 org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109) 在 org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:263) 在 org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:844) 在 org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:584) 在 org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447) 在 java.lang.Thread.run(Thread.java:680) 引起原因:java.net.SocketException:管道损坏 在 java.net.SocketOutputStream.socketWrite0(本机方法) 在 java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) 在 java.net.SocketOutputStream.write(SocketOutputStream.java:136) 在 org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:737) 在 org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434) 在 org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299) 在 org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:963) 在 org.apache.coyote.Response.action(Response.java:183) 在 org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314) ... 20 更多 [BlazeDS]23:18:13.689 [DEBUG] id 为“my-streaming-amf”的端点的流线程“http-8400-1”正在释放连接并返回到请求处理程序池。 [BlazeDS]23:18:13.689 [INFO] ID 为“5BC5E8D604A361BCA673B05AC624CCC1”的 FlexSession 流客户端数量为 0。 [BlazeDS]23:18:13.689 [DEBUG] ID 为“my-streaming-amf”的端点的流客户端数量为 0。
在此阶段,订阅在 ActiveMQ Web 管理界面上仍显示为活动状态。
从控制台使用
kill -9
杀死 BlazeDS(更准确地说,托管它的 Tomcat 服务器)会使 ActiveMQ 立即意识到“代理 JMS 客户端”已经消失,并且在 ActiveMQ 上离线网络管理界面。这让我得出结论,BlazeDS 显式地保持代理 JMS 客户端处于活动状态,因为kill -9
没有机会让 BlazeDS 取消订阅客户端,但它在 ActiveMQ 中仍然处于离线状态。
那么,问题又来了:当真正的 Flex 对应项意外终止时,我可以做什么来确保 BlazeDS 使 ActiveMQ 中的“代理 JMS 客户端”离线?这是 BlazeDS 中的错误还是我只是缺少一些使其工作的隐藏配置设置?
版本信息:BlazeDS 4.0、ActiveMQ 5.5.0,均是今天新下载的。我在 BlazeDS 交钥匙软件中使用 Tomcat 服务器,但 ActiveMQ 是单独安装的,因为 BlazeDS 交钥匙软件仅附带 ActiveMQ 4.1.1。顺便说一句,该版本的 ActiveMQ 也有同样的问题。
I'm trying to make a Flex-based desktop application consume messages from an ActiveMQ topic with a durable subscription, using the JMS bridge of BlazeDS. The basic scenario is as follows:
Messages are produced by other producers in the topic to which the Flex client is subscribed.
The Flex client may go offline from time to time, but it must receive all the messages it has missed while being offline when it connects to BlazeDS again. (Of course the Flex client connects with the same client ID every time).
It can not be guaranteed that the Flex client is shut down gracefully.
Everything works fine if I explicitly disconnect my consumer on the Flex side by calling disconnect()
- I do it in the exit handler of the application. However, due to #3 above, it is not guaranteed that disconnect()
is called all the time. When the Flex client shuts down without calling disconnect()
, it seems that the subscription of the "proxy JMS client" that BlazeDS creates and associates to the Flex client stays active towards ActiveMQ, so ActiveMQ still thinks that the client is logged in. When the Flex app starts up the next time, it is unable to log in to BlazeDS because ActiveMQ refuses its subscription, claiming that the client ID is already taken. Why is it so and what can I do here to ensure that BlazeDS makes the "proxy JMS client" offline in ActiveMQ when its real Flex counterpart terminates unexpectedly?
More detailed information: some debugging revealed that:
BlazeDS becomes aware of the termination of the Flex client because it prints a few exceptions to the console when in debug mode. The messages are as follows:
[BlazeDS]23:18:13.688 [WARN] Endpoint with id 'my-streaming-amf' is closing the streaming connection to FlexClient with id '71E6466F-D91F-201C-F60A-A6CB52F95D9F' because endpoint encountered a socket write error, possibly due to an unresponsive FlexClient. ClientAbortException: java.net.SocketException: Broken pipe at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:319) at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288) at org.apache.catalina.connector.Response.flushBuffer(Response.java:542) at org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:279) at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.handleFlexClientStreamingOpenRequest(BaseStreamingHTTPEndpoint.java:818) at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.serviceStreamingRequest(BaseStreamingHTTPEndpoint.java:1055) at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.service(BaseStreamingHTTPEndpoint.java:460) at flex.messaging.MessageBrokerServlet.service(MessageBrokerServlet.java:353) at javax.servlet.http.HttpServlet.service(HttpServlet.java:803) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:175) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:128) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:263) at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:844) at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:584) at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447) at java.lang.Thread.run(Thread.java:680) Caused by: java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:737) at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434) at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299) at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:963) at org.apache.coyote.Response.action(Response.java:183) at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314) ... 20 more [BlazeDS]23:18:13.689 [DEBUG] Streaming thread 'http-8400-1' for endpoint with id 'my-streaming-amf' is releasing connection and returning to the request handler pool. [BlazeDS]23:18:13.689 [INFO] Number of streaming clients for FlexSession with id '5BC5E8D604A361BCA673B05AC624CCC1' is 0. [BlazeDS]23:18:13.689 [DEBUG] Number of streaming clients for endpoint with id 'my-streaming-amf' is 0.
At this stage, the subscriptions are still shown on the ActiveMQ web admin interface as being active.
Killing BlazeDS (more precisely, the Tomcat server that hosts it) with
kill -9
from the console makes ActiveMQ realize immediately that the "proxy JMS client" is gone and it becomes offline on the ActiveMQ web admin interface. This made me conclude that BlazeDS is keeping the proxy JMS client alive explicitly sincekill -9
gives no chance to BlazeDS to unsubscribe the client but it still becomes offline in ActiveMQ.
So, the question once again: What can I do here to ensure that BlazeDS makes the "proxy JMS client" offline in ActiveMQ when its real Flex counterpart terminates unexpectedly? Is this a bug in BlazeDS or am I just missing some hidden configuration setting that would make it work?
Version information: BlazeDS 4.0, ActiveMQ 5.5.0, both freshly downloaded today. I'm using the Tomcat server in the BlazeDS turnkey but ActiveMQ is installed separately because the BlazeDS turnkey ships with ActiveMQ 4.1.1 only. By the way, that version of ActiveMQ has the same issue.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
问题是 BlazeDS 无法检测到您的 Flex 客户端已关闭,您必须实现自己的机制 - 我的建议是使用通过消息传递实现的心跳。如果一段时间后没有收到来自客户端的消息,您可以假设 Flex 客户端已消失并断开连接(或者您可以使用服务器上的会话超时机制,并在会话过期时断开连接)。
您所看到的(流媒体通道关闭时捕获的异常)不足以百分百确定 Flex 客户端已消失。流式传输是使用永久打开的 HTTP 连接(用于发送服务器消息)和定期的 HTTP post 调用(由客户端发起以发送消息)来实现的。在某些网络中,防火墙可能会在几秒钟后决定终止 HTTP 连接,您将收到与您发布的错误相同的错误。然而,这并不意味着Flex客户端被杀死——在这种情况下,Flex客户端可以使用回退策略并切换到短/长轮询。实际上,如果 BlazeDS 在这种情况下自动断开 JMS,这将是一个错误。
The problem is that there is no way for BlazeDS to detect that your Flex client was shutdown, you will have to implement your own mechanism - my suggestion is to use a heart beat implemented with messaging. If no message is received from the client after a time interval you can assume that the Flex client is gone and do the disconnect (or you can use the session timeout mechanism on the server, and do the disconnect on session expire).
What you have seen (the exception caught when the streaming channel is closed) is not enough to say 100% sure that the Flex client is gone. The streaming is implemented using an HTTP connection kept open forever (used to send server messages) and periodic HTTP post calls (initiated by the client to send messages). In some networks the firewall can decide to kill the HTTP connection after a couple of seconds and you will receive the same error like the one you posted. However, it does not mean that the Flex client is killed - the Flex client can use a fallback strategy and switch to short/long polling in this case. Actually it would be a bug if BlazeDS will automatically do the JMS disconnect in this case.