Activemq+zk HA集群

发布于 2021-12-04 13:49:19 字数 14305 浏览 867 评论 3

在一个台win7单机中,部署三个zk实例和三个mq实例。

往mq中发送10W+消息,启动监听器监听消息队列,在消费行为进行到2w+的时候,抛出异常,如下:

org.springframework.jms.listener.DefaultMessageListenerContainer handleListenerSetupFailure
警告: Setup of JMS message listener invoker failed for destination 'queue://textQueueMq' - trying to recover. Cause: Could not open JDBC Connection for transaction; nested exception is com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The driver was unable to create a connection due to an inability to establish the client portion of a socket.

 

 

spring 消费者配置如下:

<!--生产者配置-->
<bean id="consumerTargetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://localhost:61626,tcp://localhost:61636)?jms.prefetchPolicy.queuePrefetch=10"/>
    <property name="userName"  value="admin"/>
    <property name="password"  value="admin"/>
    <property name="useAsyncSend" value="true"/>
    <property name="maxThreadPoolSize" value="100"/>
</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="consumerConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="consumerTargetConnectionFactory"/>
    <property name="sessionCacheSize" value="100" />
</bean>


<!--配置多个监听器-->
<bean id="consumerMessageListener2" class="com.hispeed.listener.ConsumerMessageListener2"/>

<!--配置监听容器-->
<bean id="TestContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="consumerConnectionFactory" />
    <property name="destination" ref="textQueueTTT"/>
    <property name="messageListener" ref="consumerMessageListener2"/>
    <!--同时启动多个消费者实例来提高消费的效率-->
    <!--<property name="concurrentConsumers" value="10"/>-->
    <property name="concurrency" value="5-10"/>
    <property name="sessionTransacted" value="true"/>
</bean>

 

mq配置如下:

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        
        <!--
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>
                -->
                
                <!--zookeeper-->
                
                <persistenceAdapter>
                <replicatedLevelDB
                  directory="${activemq.data}/leveldb"
                  replicas="3"
                  bind="tcp://0.0.0.0:0"
                  zkAddress="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
                  hostname="127.0.0.1"
                  sync="local_disk"
                  zkPath="/activemq/leveldb-stores"
                  />
            </persistenceAdapter>
            

          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        
        <!--
        <networkConnectors>
                <networkConnector uri="masterslave:(tcp://0.0.0.0:61617,tcp://0.0.0.0:61618)" duplex="false" prefetchSize="1"/>
                </networkConnectors>
                -->

        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

 

请教mq大牛,看看是不是配置有问题?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

尐偏执 2021-12-06 14:36:22

谢谢 tinshen的提醒,的确是mysql数据配置的问题,之前我用的是jdbc自带的数据源,我现在换成了dbcp数据源之后就没有上面的问题了,虽然不知道这两者有什么联系,但是问题是解决了。

柳絮泡泡 2021-12-06 11:47:20

mq不应该跟数据库配置有关系吧?

筱武穆 2021-12-04 23:55:03

trying to recover. Cause: Could not open JDBC Connection for transaction; nested exception is com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The driver was unable to create a connection due to an inability to establish the client portion of a socket.

数据库那里的代码检查一样。

为什么没办法创建数据库连接了。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文