本文提供相关源码,请放心食用,详见网页侧边栏或底部,有疑问请评论或 Issue
一、ActiveMQ
1.1 什么是 ActiveMQ
ActiveMQ
是Apache出品,最流行的,能力强劲的开源消息总线
。ActiveMQ 是一个完全支持 JMS1.1
和 J2EE 1.4
规 范的 JMS Provider
实现,尽管 JMS
规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
主要特点:
多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
完全支持 JMS1.1 和 J2EE 1.4 规范 (持久化,XA消息,事务)
对 Spring 的支持 ,ActiveMQ 可以很容易内嵌到使用Spring的系统里面去,而且也支持 Spring2.0 的特性
通过了常见 J2EE 服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过 JCA 1.5 resource adaptors 的配置,可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE 1.4 商业服务器上
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通过 JDBC 和 journal 提供高速的消息持久化
从设计上保证了高性能的集群,客户端-服务器,点对点
支持 Ajax
可以很容易得调用内嵌 JMS provider,进行测试
1.2 消息形式
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
StreamMessage
– Java原始值的数据流
MapMessage
–键值对
TextMessage
–字符串对象 (主要使用,什么都可以转JSON字符串)
ObjectMessage
–序列化的 Java对象
BytesMessage
–字节的数据流
二、安装 ActiveMQ
本次测试版本为当前最新的 ActiveMQ 5.15.3 Release
,点击下载
解压并放在 usr/local
目录下:
1 2 3 4 5 6 root@ubuntu:/home/wxs# tar zxvf apache-activemq-5.15.3-bin.tar.gz root@ubuntu:/home/wxs# mv apache-activemq-5.15.3 /usr/local/activemq root@ubuntu:/home/wxs# cd /usr/local/activemq/ root@ubuntu:/usr/local/activemq# ls activemq-all-5.15.3.jar conf docs lib NOTICE webapps bin data examples LICENSE README.txt webapps-demo
启动 ActiveMQ
(需要jdk支持):
启动:./activemq start
停止:./activemq stop
1 2 3 4 5 6 root@ubuntu:/usr/local/activemq# cd bin/ root@ubuntu:/usr/local/activemq/bin# ./activemq start INFO: Loading '/usr/local/activemq//bin/env' INFO: Using java '/usr/local/jdk1.8.0_161/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/usr/local/activemq//data/activemq.pid' (pid '6914')
ActiveMQ 默认运行在 8161
端口,点击首页的 Manage ActiveMQ broker
进入后台管理系统,用户名:admin ,密码:admin .
三、Queue
前面说过 ActiveMQ
有两种消息模式,即:
首先介绍下点对点模式
,即 Queue
。
首先导入 ActiveMQ 依赖:
1 2 3 4 5 <dependency > <groupId > org.apache.activemq</groupId > <artifactId > activemq-all</artifactId > <version > 5.15.3</version > </dependency >
3.1 Producer
首先我们编写下消息的发布者(Producer
),因为 ActiveMQ
遵循 JMS
规范,所以步骤比较繁琐,步骤如下:
创建一个连接工厂对象 ,指定服务 IP 和端口
使用工厂对象创建 Collection 对象
开启连接
创建 Session 对象
使用 Session 对象创建 Destination 对象
使用 Session 对象创建 Producer 对象
创建 Message 对象
发送消息
关闭资源
编写一个测试方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void testQueueProducer () throws Exception { String brokerURL = "tcp://192.168.30.155:61616" ; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test-queue" ); MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("It just a test queue..." ); producer.send(message); producer.close(); session.close(); connection.close(); }
执行后查看 ActiveMQ 后台,选择 Queue
选项卡可以看到我们创建的 Queue 对象 test-queue:
点进去可以看到发送的消息内容:
3.2 Consumer
现在我们已经把消息发送出去了,但是还没有消费者去接收这个消息,下面开始编写消费者,步骤如下:
创建一个连接工厂对象 ,指定服务IP和端口
使用工厂对象创建 Collection 对象
开启连接
创建 Session 对象
使用 Session 对象创建 Destination 对象
使用 Session 对象创建 Consumer 对象
接收消息
关闭资源
可以看到1 ~ 5步和之前都是一样的,编写测试方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Test public void testQueueConsumer () throws Exception { String brokerURL = "tcp://192.168.30.155:61616" ; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test-queue" ); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(message -> { try { TextMessage msg = (TextMessage) message; System.out.println("接收到消息:" + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } }); System.in.read(); consumer.close(); session.close(); connection.close(); }
运行程序,接收到了之前的消息:
此时重新查看后台,发现 Number Of Pending Messages
的值已经变成 0,说明刚刚那条消息已经被消费掉了,Number Of Consumers
值变为了 1。
四、Topic
下面介绍下发布/订阅模式
,即 Topic
,当一个生产者发送了消息后。多个订阅了该生产者的消费者都会收到消息。
使用 Queue
时,因为是一对一,所以消费者没有接收的话消息会被保存到 ActiveMQ 后台,我们可以查看得到,如上面的图示。
但是 Topic
不一样,ActiveMQ 后台是不会保存消息的,因此如果消费者没有接收的话,这个消息就丢失了 。
因此,我们必须先启动消费者,生产者再发送消息 ,消费者我才能收到。
4.1 Producer
Topic 的生产者和 Queue 生产者的步骤几乎一样,只是第 5 步创建 Destination
对象的实现类修改为 Topic 即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void testTopicProducer () throws Exception { String brokerURL = "tcp://192.168.30.155:61616" ; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test-topic" ); MessageProducer producer = session.createProducer(topic); TextMessage message = session.createTextMessage("It just a test topic..." ); producer.send(message); producer.close(); session.close(); connection.close(); }
进入后台查看,test-topic 拥有 0 个消费者,1 个消息入队,0 个消息入队。前面说过 ActiveMQ 后台是不会保存 Topic 的消息的,所以我们刚刚发送的消息因为没有消费者就丢失了。
4.2 Consumer
和 Queue 的 Consumer 差不多,只是修改第5步改为创建 Topic 即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Test public void testTopicConsumer () throws Exception { String brokerURL = "tcp://192.168.30.155:61616" ; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test-topic" ); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(message -> { try { TextMessage msg = (TextMessage) message; System.out.println("接收到消息:" + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } }); System.in.read(); consumer.close(); session.close(); connection.close(); }