一、简介

1.1 消费者集群(Queue consumer clusters)

ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上。

如果一个Consumer获得消息比其它Consumer快,那么他将获得更多的消息。

因此推荐ActiveMQ的Broker和Client使用failover://transport的方式来配置链接。


1.2 Broker clusters

大部情况下是使用一系列的Broker和Client链接到一起。如果一个Broker死掉了,Client可以自动链接到其它Broker上。

实现以上行为需要用failover协议作为Client。


如果启动了多个Broker,Client可以使用static discover或者 Dynamic discovery容易的从一个broker到另一个broker直接链接。


这样当一个broker上没有Consumer的话,那么它的消息不会被消费的,然而该broker会通过存储和转发的策略来把该消息发到其它broker上。


特别注意:

  ActiveMQ默认的两个broker,static链接后是单方向的,broker-A可以访问消费broker-B的消息,如果要支持双向通信,需要在

  netWorkConnector配置的时候,设置duplex=true 就可以了。


操作,服务端搭建静态网络连接与消息回流

消息者1:

public void test1() throws Exception {     ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61676");     Connection connection = cf.createConnection();     connection.start();     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);     Destination destination = session.createQueue("my-queue");     for (int i = 0; i < 1; i++) {         MessageConsumer consumer = session.createConsumer(destination);         consumer.setMessageListener(new MessageListener() {     @Override     public void onMessage(Message message) {         TextMessage m = (TextMessage) message;         try {             System.out.println("===收到11111111:" + m.getText());     session.commit();         } catch (JMSException e) {             e.printStackTrace();         }         } });     } }

消息者2:

public void test1() throws Exception {     ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61616");     Connection connection = cf.createConnection();     connection.start();     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);     Destination destination = session.createQueue("my-queue");     for (int i = 0; i < 1; i++) {         MessageConsumer consumer = session.createConsumer(destination);         consumer.setMessageListener(new MessageListener() {     @Override     public void onMessage(Message message) {         TextMessage m = (TextMessage) message;         try {             System.out.println("===收到222222222:" + m.getText());     session.commit();         } catch (JMSException e) {             e.printStackTrace();         }         } });     } }

生产者:

public void test1() throws Exception {     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");     Connection connection = connectionFactory.createConnection();     connection.start();     Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);     Destination destination = session.createQueue("my-queue");     MessageProducer producer = session.createProducer(destination);     for (int i = 0; i < 30; i++) {         TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); producer.send(message);     }     session.commit();     session.close();     connection.close(); }

效果:

    


二、主从节点的集群(Master Slave)

在5.9的版本里面,废除了Pure Master Slave的方式,目前支持:

  1:Shared File System Master Slave:

    基于共享储存的Master-Slave:多个broker实例使用一个存储文件,谁拿到文件锁就是master,其他处于待启动状态,如果master挂掉了,

    某个抢到文件锁的slave变成master

  2:JDBC Master Slave:基于JDBC的Master-Slave:使用同一个数据库,拿到LOCK表的写锁的broker成为master

  3:Replicated LevelDB Store:基于ZooKeeper复制LevelDB存储的Master-Slave机制,这个是5.9新加的


具体的可以到官方察看: http://activemq.apache.org/masterslave.html 


注意:这里可以不要静态连接与回流了


2.1 JDBC Master Slave的方式

2.1.1 简介

利用数据库作为数据源,采用Master/Slave模式,其中在启动的时候Master首先获得独有锁,其它Slaves Broker则等待获取独有锁。

推荐客户端使用Failover来链接Brokers。


具体如下图所示:

  

Master失败

  如果Master失败,则它释放独有锁,其他Slaver则获取独有锁,其它Slaver立即获得独有锁后此时它将变成Master,并且启动所有的传输链接。

  同时,Client将停止链接之前的Master和将会轮询链接到其他可以利用的Broker即新Master。如上中图所示

Master重启

  任何时候去启动新的Broker,即作为新的Slave来加入集群,如上右图所示


2.1.2 JDBC Master Slave的配置

使用<jdbcPersistenceAdapter/>来配置消息的持久化,自动就会使用JDBC Master Slave的方式。

参考:ActiveMQ消息存储持久化 里的jdbc

去掉静态连接:参考ActiveMQ的静态网络链接

去掉回流:参考集群下的消息回流功能


注意:在配置JDBC时,注意配置useDatabaseLock="true",如下

    <jdbcPersistenceAdapter dataSource="#mysql-ds" useDatabaseLock="true" />

必需设置,不然在保存数据时会报数据主键重复异常


2.1.3 测试

生产30个消息:

public void test1() throws Exception {     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");     Connection connection = connectionFactory.createConnection();     connection.start();     Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);     Destination destination = session.createQueue("my-queue");     MessageProducer producer = session.createProducer(destination);     for (int i = 0; i < 30; i++) {         TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); producer.send(message);     }     session.commit();     session.close();     connection.close(); }

  


消费3个消息:

public void test1() throws Exception {     ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");     Connection connection = cf.createConnection();     connection.start();     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);     Destination destination = session.createQueue("my-queue");     MessageConsumer consumer = session.createConsumer(destination);     int i=0;     while(i<3) {         i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); System.out.println("收到消 息:" + message.getText());     }     session.close();     connection.close(); }

  


然后关闭61616这台mq,进入到61676界面: