如何避免数据随着客户端连接数量增加一倍或三倍

发布于 2024-11-02 08:10:52 字数 8527 浏览 0 评论 0原文

我正在使用 blazeDs pub/sub 方法和 flex_sdk_4.1 开发一个 Flex 应用程序,该应用程序显示使用 esper 生成的实时事件的数据网格和图表。用actionscript 编写的messageHandler() 检索通过JMS 传递的事件。当我在 Tomcat 服务器中运行此应用程序时,它对于一个连接运行良好,但是,如果我同时增加客户端连接数,则事件会根据连接数增加一倍或三倍。我需要显示数据而不重复。谁能帮我修复这个错误。提前致谢。 以下代码是 Flex 配置文件,供您参考。

Messaging-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" 
    class="flex.messaging.services.MessageService">
    <adapters>
        <adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" />
        <!-- <adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter"/> -->
    </adapters>
    <default-channels>
        <channel ref="my-longpolling-amf"/>
    </default-channels>
   <destination id="sensordata">
    <properties>    
      <server>
         <allow-subtopics>true</allow-subtopics>    
      </server>
    </properties>
   </destination>
</service>

Remoting-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<service id="remoting-service" 
    class="flex.messaging.services.RemotingService">
    <adapters>
        <adapter-definition id="java-object" class="flex.messaging.services.remoting.adapters.JavaAdapter" default="true"/>
    </adapters>
    <default-channels>
        <channel ref="my-amf"/>
    </default-channels>
<destination id="Esperjms">
    <properties>
    <source>jmsesper.flexserver.jmsConsumer.jmsConsumer</source>
    </properties>       
</destination>

services.config.xml

<?xml version="1.0" encoding="UTF-8"?>
<services-config>
    <services>
        <service-include file-path="remoting-config.xml" />
        <service-include file-path="proxy-config.xml" />    
        <service-include file-path="messaging-config.xml" />        
    </services>
    <security>
        <login-command class="flex.messaging.security.TomcatLoginCommand" server="Tomcat"/>
        <!-- Uncomment the correct app server
        <login-command class="flex.messaging.security.TomcatLoginCommand" server="JBoss">
        <login-command class="flex.messaging.security.JRunLoginCommand" server="JRun"/>        
        <login-command class="flex.messaging.security.WeblogicLoginCommand" server="Weblogic"/>
        <login-command class="flex.messaging.security.WebSphereLoginCommand" server="WebSphere"/>
        -->
        <!-- 
        <security-constraint id="basic-read-access">
            <auth-method>Basic</auth-method>
            <roles>
                <role>guests</role>
                <role>accountants</role>
                <role>employees</role>
                <role>managers</role>
            </roles>
        </security-constraint>
         -->
    </security>
    <channels>
     <channel-definition id="my-amf" class="mx.messaging.channels.AMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amf" class="flex.messaging.endpoints.AMFEndpoint"/>  
        </channel-definition>
        <channel-definition id="my-secure-amf" class="mx.messaging.channels.SecureAMFChannel">
            <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/amfsecure" class="flex.messaging.endpoints.SecureAMFEndpoint"/>
            <properties>
                <add-no-cache-headers>false</add-no-cache-headers>
            </properties>
        </channel-definition>   
        <channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
            <properties>
                <polling-enabled>true</polling-enabled>
                <polling-interval-seconds>1</polling-interval-seconds>
            </properties>
        </channel-definition>
    <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>            
        <properties>            
        <user-agent-settings>
                    <user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="1"/>
                    <user-agent match-on="Firefox" kickstart-bytes="0" max-streaming-connections-per-session="10"/>
            <user-agent match-on="Chrome" max-streaming-connections-per-session="5"/>
                </user-agent-settings>
        </properties>
        </channel-definition>
    <channel-definition id="my-longpolling-amf" class="mx.messaging.channels.AMFChannel">
        <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amflongpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
        <properties>
            <polling-enabled>true</polling-enabled>
            <polling-interval-seconds>5</polling-interval-seconds>
            <wait-interval-millis>60000</wait-interval-millis>
            <client-wait-interval-millis>1</client-wait-interval-millis>
            <max-waiting-poll-requests>200</max-waiting-poll-requests>
                    <user-agent-settings>
                <user-agent match-on="Chrome" max-streaming-connections-per-session="5"/>
                    </user-agent-settings>
        </properties>
    </channel-definition>   
        <!--
        <channel-definition id="my-http" class="mx.messaging.channels.HTTPChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/http" class="flex.messaging.endpoints.HTTPEndpoint"/>
        </channel-definition>
        <channel-definition id="my-secure-http" class="mx.messaging.channels.SecureHTTPChannel">
            <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/httpsecure" class="flex.messaging.endpoints.SecureHTTPEndpoint"/>
            <properties>
                <add-no-cache-headers>false</add-no-cache-headers>
            </properties>
        </channel-definition>
        -->
    </channels>
    <logging>
        <target class="flex.messaging.log.ConsoleTarget" level="Error">
            <properties>
                <prefix>[BlazeDS] </prefix>
                <includeDate>false</includeDate>
                <includeTime>false</includeTime>
                <includeLevel>false</includeLevel>
                <includeCategory>false</includeCategory>
            </properties>
            <filters>
                <pattern>Endpoint.*</pattern>
                <pattern>Service.*</pattern>
                <pattern>Configuration</pattern>
            </filters>
        </target>
    </logging>
    <system>
        <redeploy>
            <enabled>false</enabled>
            <!-- 
            <watch-interval>20</watch-interval>
            <watch-file>{context.root}/WEB-INF/flex/services-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/proxy-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/remoting-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/messaging-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/data-management-config.xml</watch-file>
            <touch-file>{context.root}/WEB-INF/web.xml</touch-file>
             -->
        </redeploy>
    </system>
</services-config>

I'm using blazeDs pub/sub methodology and flex_sdk_4.1 to develop a flex application that displays data grid and chart for real time events generated using esper. The messageHandler() written in actionscript retrieves the events passed through JMS. When I run this application in Tomcat Server, it works well for one connection, however, if I increase the number of client connections simulatneously, the events get doubled or tripled according to the number of connections. I need to display data without duplicating. Can anyone help me to fix this bug. Thanks in advance.
Following codes are the flex config files for your reference.

Messaging-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" 
    class="flex.messaging.services.MessageService">
    <adapters>
        <adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" />
        <!-- <adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter"/> -->
    </adapters>
    <default-channels>
        <channel ref="my-longpolling-amf"/>
    </default-channels>
   <destination id="sensordata">
    <properties>    
      <server>
         <allow-subtopics>true</allow-subtopics>    
      </server>
    </properties>
   </destination>
</service>

Remoting-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<service id="remoting-service" 
    class="flex.messaging.services.RemotingService">
    <adapters>
        <adapter-definition id="java-object" class="flex.messaging.services.remoting.adapters.JavaAdapter" default="true"/>
    </adapters>
    <default-channels>
        <channel ref="my-amf"/>
    </default-channels>
<destination id="Esperjms">
    <properties>
    <source>jmsesper.flexserver.jmsConsumer.jmsConsumer</source>
    </properties>       
</destination>

services.config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<services-config>
    <services>
        <service-include file-path="remoting-config.xml" />
        <service-include file-path="proxy-config.xml" />    
        <service-include file-path="messaging-config.xml" />        
    </services>
    <security>
        <login-command class="flex.messaging.security.TomcatLoginCommand" server="Tomcat"/>
        <!-- Uncomment the correct app server
        <login-command class="flex.messaging.security.TomcatLoginCommand" server="JBoss">
        <login-command class="flex.messaging.security.JRunLoginCommand" server="JRun"/>        
        <login-command class="flex.messaging.security.WeblogicLoginCommand" server="Weblogic"/>
        <login-command class="flex.messaging.security.WebSphereLoginCommand" server="WebSphere"/>
        -->
        <!-- 
        <security-constraint id="basic-read-access">
            <auth-method>Basic</auth-method>
            <roles>
                <role>guests</role>
                <role>accountants</role>
                <role>employees</role>
                <role>managers</role>
            </roles>
        </security-constraint>
         -->
    </security>
    <channels>
     <channel-definition id="my-amf" class="mx.messaging.channels.AMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amf" class="flex.messaging.endpoints.AMFEndpoint"/>  
        </channel-definition>
        <channel-definition id="my-secure-amf" class="mx.messaging.channels.SecureAMFChannel">
            <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/amfsecure" class="flex.messaging.endpoints.SecureAMFEndpoint"/>
            <properties>
                <add-no-cache-headers>false</add-no-cache-headers>
            </properties>
        </channel-definition>   
        <channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
            <properties>
                <polling-enabled>true</polling-enabled>
                <polling-interval-seconds>1</polling-interval-seconds>
            </properties>
        </channel-definition>
    <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>            
        <properties>            
        <user-agent-settings>
                    <user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="1"/>
                    <user-agent match-on="Firefox" kickstart-bytes="0" max-streaming-connections-per-session="10"/>
            <user-agent match-on="Chrome" max-streaming-connections-per-session="5"/>
                </user-agent-settings>
        </properties>
        </channel-definition>
    <channel-definition id="my-longpolling-amf" class="mx.messaging.channels.AMFChannel">
        <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amflongpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
        <properties>
            <polling-enabled>true</polling-enabled>
            <polling-interval-seconds>5</polling-interval-seconds>
            <wait-interval-millis>60000</wait-interval-millis>
            <client-wait-interval-millis>1</client-wait-interval-millis>
            <max-waiting-poll-requests>200</max-waiting-poll-requests>
                    <user-agent-settings>
                <user-agent match-on="Chrome" max-streaming-connections-per-session="5"/>
                    </user-agent-settings>
        </properties>
    </channel-definition>   
        <!--
        <channel-definition id="my-http" class="mx.messaging.channels.HTTPChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/http" class="flex.messaging.endpoints.HTTPEndpoint"/>
        </channel-definition>
        <channel-definition id="my-secure-http" class="mx.messaging.channels.SecureHTTPChannel">
            <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/httpsecure" class="flex.messaging.endpoints.SecureHTTPEndpoint"/>
            <properties>
                <add-no-cache-headers>false</add-no-cache-headers>
            </properties>
        </channel-definition>
        -->
    </channels>
    <logging>
        <target class="flex.messaging.log.ConsoleTarget" level="Error">
            <properties>
                <prefix>[BlazeDS] </prefix>
                <includeDate>false</includeDate>
                <includeTime>false</includeTime>
                <includeLevel>false</includeLevel>
                <includeCategory>false</includeCategory>
            </properties>
            <filters>
                <pattern>Endpoint.*</pattern>
                <pattern>Service.*</pattern>
                <pattern>Configuration</pattern>
            </filters>
        </target>
    </logging>
    <system>
        <redeploy>
            <enabled>false</enabled>
            <!-- 
            <watch-interval>20</watch-interval>
            <watch-file>{context.root}/WEB-INF/flex/services-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/proxy-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/remoting-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/messaging-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/data-management-config.xml</watch-file>
            <touch-file>{context.root}/WEB-INF/web.xml</touch-file>
             -->
        </redeploy>
    </system>
</services-config>

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

淡写薰衣草的香 2024-11-09 08:10:52

该应用程序集成了 ActiveMQ,实现了发布/订阅方法。在现有场景中,为“主题”创建会话,根据消费者启用的连接数量,将事件放入队列并使其出队两倍或三倍。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        javax.jms.Connection connection = connectionFactory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Destination represents here our queue 'TESTQUEUE' 
        Destination destination = session.**createTopic("TESTQUEUE")**;
        // MessageProducer is used for sending messages (as opposed to MessageConsumer which is used for receiving them)
        producer = session.createProducer(destination); 

当为队列创建会话时,此错误已被清除,如下所示。更改在生产者和消费者文件中完成

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        javax.jms.Connection connection = connectionFactory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Destination represents here our queue 'TESTQUEUE' 
        Destination destination = session.**createQueue("TESTQUEUE")**;
        // MessageProducer is used for sending messages (as opposed to MessageConsumer which is used for receiving them)
        producer = session.createProducer(destination); 

The application integrates ActiveMQ, that implements pub/sub methodology. In the existing scenario session was created for 'topic' that enqueues the event and dequeues it doubled or tripled according to the number of connections the consumer enables.

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        javax.jms.Connection connection = connectionFactory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Destination represents here our queue 'TESTQUEUE' 
        Destination destination = session.**createTopic("TESTQUEUE")**;
        // MessageProducer is used for sending messages (as opposed to MessageConsumer which is used for receiving them)
        producer = session.createProducer(destination); 

This bug was cleared when the session was created for Queue as follows. The change is done in both producer and consumer files

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        javax.jms.Connection connection = connectionFactory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Destination represents here our queue 'TESTQUEUE' 
        Destination destination = session.**createQueue("TESTQUEUE")**;
        // MessageProducer is used for sending messages (as opposed to MessageConsumer which is used for receiving them)
        producer = session.createProducer(destination); 
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文