ActiveMQ初探(3)——与SpringBoot整合

发送即时消息

Step1:添加依赖包:

这里我们不使用网上大多数的spring-boot-starter-activemq,因为我配的时候总是有问题,使用如下的三个依赖包,分别是activemq连接池包activemq包jms包

<!-- ActiveMQ连接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.3</version>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.3</version>
</dependency>

Step2:添加配置信息:

在配置文件application.properties中,添加配置信息:

#ActiveMQ
#默认为内存中的activemq,设为false关闭掉
spring.activemq.in-memory=false
#连接地址,注释的为集群版
spring.activemq.broker-url=tcp://192.168.30.188:61616
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.close-timeout=500ms
spring.activemq.send-timeout=3000ms
#开启ActiveMQ连接池
spring.activemq.pool.enabled=true
#连接池最大连接数
spring.activemq.pool.max-connections=100
#空闲的连接过期时间,默认为30秒
spring.activemq.pool.idle-timeout=30s

Step3:编写Producer

编写MQProducer.java用于提供通用的Producer。

首先为该类添加@Component注解加入容器,然后将JmsMessagingTemplate注入进来,它用于消息的发送,它构建于JmsTemplate顶部,并提供了消息抽象的集成。

在方法中调用JmsMessagingTemplateconvertAndSend()方法就能实现消息的发送。

代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Destination;

/**
* ActiveMQ服务提供
* @author jitwxs
* @since 2018/5/8 16:56
*/
@Component
public class MQProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

/**
* 即时发送
* @param message 内容,一般为JSON
* @author jitwxs
* @since 2018/5/8 21:08
*/
public void send(Destination destination, String message){
jmsMessagingTemplate.convertAndSend(destination,message);
}
}

Step4:发送消息

假设我们要发送一个名为test-queue的点对点消息,让我们编写一个方法,并在其中实现消息的发送。

编写一个Controller,并在其中将MQProducer 注入进来,调用其send()方法。

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Destination;

/**
* @author jitwxs
* @since 2018/5/8 23:17
*/
@RestController
public class MQController {
@Autowired
private MQProducer producer;

@GetMapping("/queue/{info}")
public void sendQueue(@PathVariable String info) {

// 创建一个Queue的Destination,参数为Destination的名称
Destination destination = new ActiveMQQueue("test-queue");
// 发送消息。参数1:Destination;参数2:消息内容
producer.send(destination,info);
}
}

Step5:编写Consumer

消息是发出去了,但是现在还没有Consumer来消费,让我们创建一个Consumer。

创建TestQueueConsumer类,也要添加@Component注解加入容器,写一个接收方法,在方法上加入@JmsListener注解,用于指定要接收destination的名称:

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
* @author jitwxs
* @since 2018/5/8 23:23
*/
@Component
public class TestQueueConsumer {

// 值为要接收的destination的名称
@JmsListener(destination = "test-queue")
public void receiveQueue(String text) {
System.out.println("接收到消息:" + text);
}
}

大功告成,让我们测试一下:
这里写图片描述


发送延时消息

下面再说一个如何发送延时消息,该功能可以实现指定多少秒后发送消息。

例如在电商网站中举办抢购活动,用户点击了抢购前提醒功能,那么我们设置延时消息,在活动开始前提醒用户。

又比如在我的一个程序中,使用延时消息完成了设备的定时开/关闭,这个程序链接点这里,有兴趣可以看看。

Step1:开启延时消息

ActiveMQ默认没有开启延时消息,编辑activemq目录下的conf/activemq.xml文件,修改第20行,在末尾加上schedulerSupport="true",如图所示:
这里写图片描述

重新启动ActiveMQ(./activemq restart)即可。

Step2:编写发送延时消息的方法

MQProducer中,新增一个delaySend()方法,用于发送延时消息:

/**
* 延时发送
* @param message 内容,一般为JSON
* @param time 时间(单位:ms)
* @author jitwxs
* @since 2018/5/8 21:08
*/
public void delaySend(Destination destination,String message, Long time) {
// 得到jmsTemplate
JmsTemplate jmsTemplate = jmsMessagingTemplate.getJmsTemplate();
// 发送消息
jmsTemplate.send(destination, session -> {
TextMessage textMessage = session.createTextMessage(message);
// 设置延时时间【关键】
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
return textMessage;
});
}

Step3:测试延时消息

在Controller中新增一个方法,用于测试延时消息:

@GetMapping("/delay/{time}/queue/{info}")
public void sendDelayQueue(@PathVariable Long time, @PathVariable String info) {
// 创建一个Queue的Destination,参数为Destination的名称
Destination destination = new ActiveMQQueue("test-queue");
// 发送消息。参数1:Destination;参数2:消息内容;参数3:延时(ms)
producer.delaySend(destination,info,time * 1000);
System.out.println("发送时间:" + new Date());
}

为了体现效果,我在最后一行打印了当前时间,让我们也在接收者那边加一行打印时间:

// 值为要接收的destination的名称
@JmsListener(destination = "test-queue")
public void receiveQueue(String text) {
System.out.println("接收到消息:" + text);
System.out.println("接收时间:" + new Date());
}

大功告成,测试一下:

这里写图片描述

在参数中,设置了延时5s,消息内容为hello world,输出结果正确!

源码地址:https://github.com/jitwxs/blog_sample

分享到
Open Chat