本文提供相关源码,请放心食用,详见网页侧边栏或底部,有疑问请评论或 Issue
一、环境准备
与 Spring 整合,除了原本的 activemq-all
外,还需导入 spring-context-support
和 spring-jms
包,如果 Spring 为5.0+,需要 javax.jms-api
依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>5.0.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.0.4.RELEASE</version> </dependency>
<dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency>
|
新建配置文件 cfg.properties
,并在其中添加ActiveMQ的相关信息:
cfg.properties1 2 3
| activemq.broker_url=tcp://192.168.30.188:61616 activemq.queue_name=test-queue activemq.topic_name=test-topic
|
编写 ActiveMQ 的配置文件 applicationContext-activemq.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:cfg.properties"/>
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.broker_url}" /> </bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg name="name" value="${activemq.queue_name}"/> </bean> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg name="name" value="${activemq.topic_name}"/> </bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> </bean> </beans>
|
配置文件中核心就是 jmsTemplate
,因为MQ中间件不止 ActiveMQ 一种,因此只要符合 JMS 规范,则都可以使用 jmsTemplate
来管理,只需要修改底层的实现就可以了(底层实现即 targetConnectionFactory
)。
在配置文件中还准备了两个 Destination
对象,即 queueDestination
和 topicDestination
,分别是对应了 Queue 模式和 Topic 模式。
二、消息发送
编写一个生产者的示例代码,步骤大概如下:
- 加载容器
- 从容器中获取 jmsTemplate 对象
- 从容器中获取 Destination 对象
- 发送消息
1 2 3 4 5 6 7 8 9 10 11 12
| @Test public void testProducer() { ApplicationContext ac = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); JmsTemplate jmsTemplate = ac.getBean(JmsTemplate.class); Destination destination = (Destination)ac.getBean("queueDestination"); jmsTemplate.send(destination, session -> session.createTextMessage("这是Spring与ActiveMQ整合的测试消息")); }
|
三、消息接收
接收消息需要自定义一个实现 MessageListener
的类,在其中打印一句话:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage)message; System.out.println("收到消息:" + textMessage.getText()); } catch (Exception e) { e.printStackTrace(); } } }
|
在Spring 容器中首先要注入 MyMessageListener
,然后配置一个 DefaultMessageListenerContainer
,指定 connectionFactory
、destination
和 messageListener
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:cfg.properties"/>
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.broker_url}" /> </bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg name="name" value="${activemq.queue_name}"/> </bean> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg name="name" value="${activemq.topic_name}"/> </bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> </bean>
<bean id="myMessageListener" class="jit.wxs.activemq.MyMessageListener"/> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
|
运行 testProducer()
方法,MessageListener 中接收到消息:
ActiveMQ 初探(2)——与 Spring 整合