ActiveMQ - Spring框架集成
# Spring 整合 ActiveMQ
之前介绍的内容也很重要,他更灵活,他支持各种自定义功能,可以满足我们工作中复杂的需求。很多 ActiveMQ 的功能,我们要看官方文档或者博客,这些功能大多是在上面代码的基础上修改完善的。如果非要把这些功能强行整合到 Spring,就有些缘木求鱼了。我认为另一种方式整合 Spring 更好,就是将上面的类注入到 Spring 中,其他不变。这样既能保持原生的代码,又能集成到 Spring。
下面的 Spring 和 SpringBoot 整合 ActiveMQ 也重要,他给我们提供了一个模板,简化了代码,减少我们工作中遇到坑,能够满足开发中 90% 以上的功能。
# 添加依赖
<dependencies>
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.3</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
<!-- activemq连接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.10</version>
</dependency>
<!-- spring支持jms的包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.10</version>
</dependency>
<!-- Spring核心依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.10</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.10</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>5.3.10</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>5.3.9</version>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# ActiveMQ 配置文件
applicationContext.xml:其中 20-24 行代码是队列,26-29 行代码是主题,目前是队列,如果需要主题,去 36 行代码修改为主题的 beanid 即可。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
<!-- 开启包的自动扫描 -->
<context:component-scan base-package="com.eight.spring"/>
<!-- 配置生产者 -->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供 -->
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.199.27:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"/>
</bean>
<!-- 这个是队列目的地,点对点的Queue -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 通过构造注入Queue名 -->
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- 这个是队列目的地, 发布订阅的主题Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!-- Spring提供的JMS工具类,他可以进行消息发送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 传入连接工厂 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 传入目的地 -->
<property name="defaultDestination" ref="destinationQueue"/>
<!-- 消息自动转换器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 队列的生产者
public class SpringProduce {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
SpringProduce produce = context.getBean(SpringProduce.class);
produce.jmsTemplate.send(session -> {
TextMessage message = session.createTextMessage("spring和ActiveMQ整合");
return message;
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# 队列的消费者
public class SpringConsumer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
SpringConsumer consumer = context.getBean(SpringConsumer.class);
String message = (String) consumer.jmsTemplate.receiveAndConvert();
System.out.println("收到的消息:" + message);
}
}
2
3
4
5
6
7
8
9
10
11
12
# 消费者的监听类
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# Spring Boot 整合 ActiveMQ
# Queue 生产者
引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--spring boot整合activemq的jar包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.5.5</version>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
application.yml 内容
server:
port: 8888
spring:
activemq:
broker-url: tcp://192.168.199.27:61616 # url
user: admin
password: admin
jms:
pub-sub-domain: false # 目的地是queue还是topic, false(默认) = queue true = topic
myqueue: boot-activemq-queue # 自定义消息队列名字
2
3
4
5
6
7
8
9
10
11
配置目的地的 bean,也可以把下面的方法放到启动类里
@Configuration
@EnableJms // 开启适配 JMS
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean
public ActiveMQQueue queue(){
return new ActiveMQQueue(myQueue);
}
}
2
3
4
5
6
7
8
9
10
11
12
队列生产者代码,代码功能:每隔三秒发送一次消息
@Component
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${myqueue}")
private String myQueue;
public void productMessage(){
jmsMessagingTemplate.convertAndSend(myQueue, UUID.randomUUID().toString().substring(0,6));
}
// 定时发送,3 秒发送一次
@Scheduled(fixedDelay = 3000)
public void fixedDelay() {
productMessage();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
主启动类,记得开启 @EnableScheduling 注解,否则上方的 @Scheduled(fixedDelay = 3000)
没有用
@SpringBootApplication
@EnableScheduling // 开启任务定时调度
public class ProducerMain7777 {
public static void main(String[] args) {
SpringApplication.run(ProducerMain7777.class, args);
}
}
2
3
4
5
6
7
8
单元测试(非必须)
@SpringBootTest
public class QueueTest {
@Autowired
private QueueProducer queueProducer;
@Test
public void test(){
queueProducer.productMessage();
}
@Test
public void test2(){
queueProducer.fixedDelay();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# queue 消费者
pom.xml 和 application.yml 文件和上面 Queue 生产者 的一致。
新增消费者消费信息代码,监听消息队列,指定队列名字(配置文件自定义),一旦队列名里有人发布消息,就能即刻获取
@Component
public class QueueConsumer {
// 注册一个监听器。Destination 指定监听的主题
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage message) throws JMSException {
System.out.println("收到的消息:" + message.getText());
}
}
2
3
4
5
6
7
8
# Topic 生产者
和 Queue 生产者 代码基本一样
application.yml (具体看第 9 行代码)
server:
port: 7777
spring:
activemq:
broker-url: tcp://192.168.199.27:61616
user: admin
password: admin
jms:
pub-sub-domain: true # 目的地是 Queue 还是 Topic,false(默认)指的是 queue,true 指的是 topic
mytopic: boot-activemq-topic
2
3
4
5
6
7
8
9
10
11
ConfigBean 的 ActiveMQQueue 变为 ActiveMQTopic
@Configuration
@EnableJms // 适配 JMS
public class ConfigBean {
@Value("${mytopic}")
private String myTopic;
@Bean
public ActiveMQTopic topic(){
return new ActiveMQTopic(myTopic);
}
}
2
3
4
5
6
7
8
9
10
11
12
队列生产者代码
@Component
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${mytopic}")
private String myTopic;
public void productMessage(){
jmsMessagingTemplate.convertAndSend(myTopic, UUID.randomUUID().toString().substring(0,6));
}
// 定时发送,3秒发送一次
@Scheduled(fixedDelay = 3000)
public void fixedDelay() {
productMessage();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Topic 消费者
pom.xml 和 application.yml 与 Queue 消费者 一样。
消费者消费信息代码
@Component
public class Topic_Consummer {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println("消费者受到订阅的主题:"+textMessage.getText());
}
}
2
3
4
5
6
7