ActiveMQ(13):ActiveMQ的集群 推荐 原创 我爱大金子 2017-04-26 14:57:08 博主文章分类:消息中间件 ©著作权 文章标签 Active 集群 MQ 文章分类 代码人生 ©著作权归作者所有:来自51CTO博客作者我爱大金子的原创作品,请联系作者获取转载授权,否则将追究法律责任 一、简介1.1 消费者集群(Queue consumer clusters)ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上。如果一个Consumer获得消息比其它Consumer快,那么他将获得更多的消息。因此推荐ActiveMQ的Broker和Client使用failover://transport的方式来配置链接。1.2 Broker clusters大部情况下是使用一系列的Broker和Client链接到一起。如果一个Broker死掉了,Client可以自动链接到其它Broker上。实现以上行为需要用failover协议作为Client。如果启动了多个Broker,Client可以使用static discover或者 Dynamic discovery容易的从一个broker到另一个broker直接链接。这样当一个broker上没有Consumer的话,那么它的消息不会被消费的,然而该broker会通过存储和转发的策略来把该消息发到其它broker上。特别注意: ActiveMQ默认的两个broker,static链接后是单方向的,broker-A可以访问消费broker-B的消息,如果要支持双向通信,需要在 netWorkConnector配置的时候,设置duplex=true 就可以了。操作,服务端搭建静态网络连接与消息回流消息者1:public void test1() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61676"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); for (int i = 0; i < 1; i++) { MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage m = (TextMessage) message; try { System.out.println("===收到11111111:" + m.getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } }消息者2:public void test1() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61616"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); for (int i = 0; i < 1; i++) { MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage m = (TextMessage) message; try { System.out.println("===收到222222222:" + m.getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } }生产者:public void test1() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 30; i++) { TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); producer.send(message); } session.commit(); session.close(); connection.close(); }效果: 二、主从节点的集群(Master Slave)在5.9的版本里面,废除了Pure Master Slave的方式,目前支持: 1:Shared File System Master Slave: 基于共享储存的Master-Slave:多个broker实例使用一个存储文件,谁拿到文件锁就是master,其他处于待启动状态,如果master挂掉了, 某个抢到文件锁的slave变成master 2:JDBC Master Slave:基于JDBC的Master-Slave:使用同一个数据库,拿到LOCK表的写锁的broker成为master 3:Replicated LevelDB Store:基于ZooKeeper复制LevelDB存储的Master-Slave机制,这个是5.9新加的具体的可以到官方察看: http://activemq.apache.org/masterslave.html 注意:这里可以不要静态连接与回流了2.1 JDBC Master Slave的方式2.1.1 简介利用数据库作为数据源,采用Master/Slave模式,其中在启动的时候Master首先获得独有锁,其它Slaves Broker则等待获取独有锁。推荐客户端使用Failover来链接Brokers。具体如下图所示: Master失败 如果Master失败,则它释放独有锁,其他Slaver则获取独有锁,其它Slaver立即获得独有锁后此时它将变成Master,并且启动所有的传输链接。 同时,Client将停止链接之前的Master和将会轮询链接到其他可以利用的Broker即新Master。如上中图所示Master重启 任何时候去启动新的Broker,即作为新的Slave来加入集群,如上右图所示2.1.2 JDBC Master Slave的配置使用<jdbcPersistenceAdapter/>来配置消息的持久化,自动就会使用JDBC Master Slave的方式。参考:ActiveMQ消息存储持久化 里的jdbc去掉静态连接:参考ActiveMQ的静态网络链接去掉回流:参考集群下的消息回流功能注意:在配置JDBC时,注意配置useDatabaseLock="true",如下 <jdbcPersistenceAdapter dataSource="#mysql-ds" useDatabaseLock="true" />必需设置,不然在保存数据时会报数据主键重复异常2.1.3 测试生产30个消息:public void test1() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 30; i++) { TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); producer.send(message); } session.commit(); session.close(); connection.close(); } 消费3个消息:public void test1() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i=0; while(i<3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); System.out.println("收到消 息:" + message.getText()); } session.close(); connection.close(); } 然后关闭61616这台mq,进入到61676界面: 赞 收藏 评论 分享 举报 上一篇:ActiveMQ(12):容错的链接和动态网络连接 下一篇:ActiveMQ(14):Destination(目的地)高级特性 提问和评论都可以,用心的回复会被更多人看到 评论 发布评论 全部评论 () 最热 最新 相关文章 chan实现生产者消费者模型 一个简单的例子让你更好的理解golang chan的使用 斐波拉契数列 斐波那契数列 Group C# 模仿队列线程,模仿消费者和生产者 cancellationTokenSource.Token.IsCancellationRequested:可以通过外部设置取消线程任务执行。cancellationTokenSource.Token:如果IsCancellationRequested为false会抛出异常OperationCanceledException终止线程执行 构造函数 抛出异常 代码实现 契约测试?生产者?消费者?一文帮你理清楚 五星上将麦克阿瑟曾经说过“在契约测试面前,集成测试就是个弟弟“一让我们来讲一个故事今天和女朋友吵架了,(假设你有女朋友)。今晚又是一个人睡沙发,这天晚上,你躺在沙发上,夜不能寐决定分享一下今天的主题——锲约测试契约测试什么是契约?如果从契约产生的阶段来说,现有资料表明最早要追溯到西周时期的《周恭王三年裘卫典田契》,将契约文字刻写在器皿上,就是为了使契文中规定的内容得到多方承认、信守,“万年永宝用” 微服务 ide 测试 契约测试 软件测试 activeMq SpringBoot集成 消费者 activemq集群 ActiveMQ具有强大和灵活的集群功能,ActiveMQ的集群方式主要由两种:Master-Slave和Broker Cluster。一、Master-Slave部署方式 Master-Slave方式中,只能是Master提供服务,Slave是实时地备份Master的数据,以保证消息的可靠性。当Master失效时,Slave会自动升级为Master,客户端会自动连接到Slave上工作。Maste activemq 负载均衡 高可用 客户端 activeMQ做做消费者 activemq指定消费者 刚刚接触activemq,网上也有很多资料,需要花很多时间去整理资料和自我尝试,有的能成功,有的也可能是因为自己的步骤原因导致调试失败,所以特意总结了一下自己所学习到的知识,与大家分享一下.activemq发送指定消息给指定的人: 1. 消费者代码 package com.clgg.job.activemq.queue;import javax.jms.Connection;impor activeMQ做做消费者 activemq java System 发送消息 ActiveMQ 消费者代码 activemq指定消费者 ActiveMQ中一般有两种消息队列,一是点对点模式(p2p),二是发布/订阅模式(pub/sub)。 在进行demo测试之前,我们先建立一个maven工程,引入相应的包:<dependency> <groupId>org.apache.activemq&l ActiveMQ 消费者代码 接收端 点对点 启动事务 activemq 消费者过滤 activemq指定消费者 1. Exclusive Consumer 独有消费者:Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如:你可能不希望在插入订单操作结束之前执行更新这个订单的操作。 ActiveMQ从4.x版本开始支持Exclusiv activemq 消费者过滤 优先级 离线 持久化 activeMQ 消费者注解 activemq指定消费者 消费者特性: 1、Consumer Dispatch Async 默认情况下ActiveMQ服务端采用异步方式向客户端推送消息, 从ActudiMQv4开始,已经可以配置broker向消费者转发消息的机制是同步或者是异步, 你可以选择在connection URI, Connectio activeMQ 消费者注解 消息发送 时间间隔 离线 ssm集成activemq消费者 activemq集群部署 使用ZooKeeper实现的MasterSlave实现方式, 是对ActiveMQ进行高可用的一种有效的解决方案, 高可用的原理:使用ZooKeeper(集群)注册所有的ActiveMQ Broker。只有其中的一个Broker可以对外提供服务( 也就是Master节点) ,其 ssm集成activemq消费者 ActiveMQ消息队列 集群 activemq集群搭建 ActiveMQ+zookeeper集群 activemq 消费者异步接收 activemq多个消费者 消息队列一般有两种模型1.点对点模型(基于队列 Point to Point,PTP) 每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的 相关性.可以有多个发送者,但只能被一个消费者消费。 一个消息只能被一个接受者接受一次 生产者把消息发送到队列中(Queue),接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态2. 发布者/订阅者模型(基于主题的Publish/Su activemq 消费者异步接收 ActiveMQ 点对点 java spring ActiveMq消费者消息处理 activemq有消费者不消费 概念: 消息生产者发送消息到Queue中,然后消息消费者从Queue中获取并消费消息。 消息消费以后,Queue中不在存储消息,消费者不可能消费到已经被消费的消息, Queue支持多个消费者,但单条消息,只会有一个消费者可以消费,其他的则不能消费此消息 当消费者不存在时,消息会一直保存,直到有消 ActiveMq消费者消息处理 中间件 java apache 发送消息 activemq 多个消费者 ack activemq消费者数量配置 目录activeMQ说明spring boot pom配置文件application.properties配置项目图片启动类生产者消费者1消费者2Queue与Topoic并存,配置运行参考资料 activeMQ说明JMS规范两种常用的消息模型:点对点(point to point ,queue)和发布/订阅(publish/subscribe,topic)。 点对点: 消息生产者 activemq 多个消费者 ack spring activemq java System activemq 查看当前消费者 activemq查看集群状态 单纯根据《ActiveMQ In Action(Manning-2011)》一书介绍的总结,部分介绍可能已经和官网不一。一、ActiveMQ的高可用性ActiveMQ使用master-slave模式实现高可用性,提供两种实现主从模式的配置:shared nothing、shared storage(a relational database and a shared file system)1.s activemq 查看当前消费者 activemq 消息存储 应用程序 apache activemq 消费者怎么响应生产者 activemq 消费者堵塞 该文描述了解决问题的过程 大概描述的是PooledConnectionFactory连接池 连接大面积释放时造成系统拥堵的解决方案 如仅关心解决方案请拖到最下方即可 涉及:JAVA + Spring + ActiveMQ + PooledConnectionFactory前情提要:随着业务的增长 在一个月黑风高的晚上 由于MQ获取不到连接 导致平台无法正常运行 于是乎修改了最大 activemq 消费者怎么响应生产者 java activemq 解决方案 连接数 String boot activeMq 消费者 rabbitmq消费者 RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。生产者和消费者 Producere:生产者,就是消息投递的一方。 生产者创建消息之后发布到RabbitMQ中。 java 消息路由 客户端 字符串 ActiveMQ_消费者编码 public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.xx.xx:61616"; public static void main(String[] args) throws Except ide 非阻塞 监听器 ActiveMQ kafak 消费者 java kafka消费者集群 一、 简介Kafka是用scala语言编写,由Linkedin公司于2010年贡献给Apache成为一个开源的消息系统,它主要用于处理活跃的流式数据。遵从一般的MQ结构。Kafka对消息保存时根据Topic进行归类,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。Kafka是依赖于zookeeper集群保存一些meta信息, kafak 消费者 java kafka zookeeper spring activemq的topic没有消费者 activemq queue 多消费 ActiveMQ中有两种消费模式,Queue(点对点)和Topic (发布/订阅),存储模式也分为非持久化和持久化。由于使用非持久化存储消息只会存储在内存中,容易造成消息丢失,实际生产环境中使用较少,因此重点介绍持久化下Queue消费。Queue模式下,允许同时有多个消费者,但是一条消息只会被其中一个消费者消费一次,ActiveMQ是如何实现这种机制的呢?我们先从Broker获取消费者需要的消息看 activemq的topic没有消费者 ActiveMQ Queue 消息分发 负载均衡 acticemq Active Subscribers中无消费者 activemq 消费者堵塞 MQ的作用削峰:如秒杀业务在某一段时间访问量剧增,导致服务器压力过大。使用了MQ后,服务接收到请求发送到mq,然后直接结束。解耦:A服务需要调用B和C服务,哪天新增了D服务,则A服务还需要改动,这样很麻烦。使用MQ,则A服务只需要把请求发给MQ, BCD服务订阅A服务的请求即可。 异步:A需要调用B、C、D服务都是50毫秒, 但B调B1需要2秒,最后A的调用时间也超过了2秒。 非阻塞 监听器 System activemq 查看消息 activemq页面查看消费者 增加一个消息消费者在transportConnection中注册了一个消息监听器DefaultTransportListener 当客户端有动作的的时候,通过消息监听器的oncommand进行处理。当有一个消费者进入,则调用该类中的processAddConsumer 方法。具体调用流程如下:消息到达服务器端之后,会被Subscription的add方法处理。主要内容是判断消息所属的destina activemq 查看消息 发送消息 客户端 持久化