Young Kbt blog Young Kbt blog
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)

Shp Liu

朝圣的使徒,正在走向编程的至高殿堂!
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)
  • MyBatis

  • MyBatis-Plus

  • 中间件 - ActiveMQ

    • ActiveMQ - 概述
    • ActiveMQ - 安装
    • ActiveMQ - 队列
    • ActiveMQ - 主题
    • ActiveMQ - JMS规范
    • ActiveMQ - 自带持久化
    • ActiveMQ - 事务性
    • ActiveMQ - Spring框架集成
    • ActiveMQ - 传输协议
    • ActiveMQ - 外部持久化
      • 前言
      • 持久化机制
        • AMQ Message Store
        • kahaDB
        • JDBC 消息存储
        • LevelDB 消息存储
        • JDBC Message Store with ActiveMQ Journal
      • kahaDB 消息存储
        • 介绍
      • JDBC 消息存储
        • 建库 SQL 和创表说明
        • Queue 验证和数据表变化
        • Topic 验证和说明
        • 总结
        • 遇到的坑
      • JDBC Message Store with ActiveMQ Journal
        • 说明
        • 配置
        • 总结
    • ActiveMQ - 高级特性
  • 中间件 - RabbitMQ

  • 中间件 - RocketMQ

  • 中间件 - Kafka

  • 高性能服务器 - Nginx

  • 响应式框架 - Reactor

  • 框架
  • 中间件 - ActiveMQ
Young Kbt
2022-07-30
目录

ActiveMQ - 外部持久化

  • 前言
  • 持久化机制
    • AMQ Message Store
    • kahaDB
    • JDBC 消息存储
    • LevelDB 消息存储
    • JDBC Message Store with ActiveMQ Journal
  • kahaDB 消息存储
    • 介绍
  • JDBC 消息存储
    • 建库 SQL 和创表说明
    • Queue 验证和数据表变化
    • Topic 验证和说明
    • 总结
    • 遇到的坑
  • JDBC Message Store with ActiveMQ Journal
    • 说明
    • 配置
    • 总结

# 前言

本内容的持久化和前面的 ActiveMQ - 自带持久化 的区别

image-20220730160828251

前面的 ActiveMQ - 自带持久化 的事务、可持久、签收,是属于 MQ 自身特性,属于自带的,也是 MQ 的高可用。

这里的持久化是外力,是外部插件。之前讲的持久化是 MQ 自带的外在表现,现在讲的的持久是是底层实现。

持久化是什么?一句话就是:ActiveMQ 宕机了,消息不会丢失的机制。说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。ActiveMQ 的消息持久化机制有 JDBC,AMQ,KahaDB 和 LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

# 持久化机制

# AMQ Message Store

基于文件的存储机制,是以前的默认机制,现在不再使用。

AMQ 是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为 32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ 适用于 ActiveMQ 5.3 之前的版本。

# kahaDB

目前 ActiveMQ 默认的持久化机制

# JDBC 消息存储

利用数据库存储消息。

# LevelDB 消息存储

新型技术,为了取代 kahaDB。这种文件系统是从 ActiveMQ 5.8 之后引进的,它和 KahaDB 非常相似,也是基于文件的本地数据库储存形式,但是它提供比 KahaDB 更快的持久性。

但它不使用自定义 B-Tree 实现来索引预写日志,而是使用基于 LeveIDB 的索引。

默认配置如下:

<persistenceAdapter>
    <leveIDBdirectory = "activemq-data"/>
</persistenceAdapter>
1
2
3

# JDBC Message Store with ActiveMQ Journal

JDBC 的增强版,先利用高缓存文件存储消息,如果消息没有被立即消费,则慢慢存入数据库。

# kahaDB 消息存储

# 介绍

KahaDB 是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。

KahaDB 是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。数据被追加到 data logs 中。当不再需要 log 文件中的数据的时候,log 文件会被丢弃。

基于日志文件,从 ActiveMQ 5.4(含)开始默认的持久化插件。

官网文档:http://activemq.aache.org/kahadb

官网上还有一些其他配置参数。

配置文件 activemq.xml 中,如下:

<persistenceAdapter>
    <kahaDB directory=${activemq.data}/kahadb/>
</persistenceAdapter>
1
2
3

日志文件的存储目录在:ActiveMQ 安装目录 /data/kahadb

image-20220730182513967

存储原理:

kahaDB 在消息保存目录中只有4类文件和一个 lock,ActiveMQ 的其他几种文件存储引擎相比这就非常简洁了。

目录结构:

image-20220730182627696

db-<Number>.log:KahaDB 存储消息到预定义大小的数据记录文件中,文件命名为 db-<Number>.log。当数据文件已满时,一个新的文件会随之创建,number 数值也会随之递增,它随着消息数量的增多,如每 32M 一个文件,文件名按照数字进行编号,如 db-1.log、db-2.log、db-3.log。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。

image-20220730182741426

db.data 该文件包含了持久化的 BTree 索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是 B-Tree(B 树),使用 B-Tree 作为索引指向 db-<Number>.log 里面存储的消息。

db.free 当前 db.data 文件里哪些页面是空闲的,文件具体内容是所有空闲页的 ID

db.redo 用来进行消息恢复,如果 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引。

lock 文件锁,表示当前获得 kahaDB 读写权限的 Broker。

# JDBC 消息存储

添加数据库驱动包

添加 MySQL 数据库的驱动包到 lib 文件夹(lib 位于 ActiveMQ 的安装目录下)

mv mysql驱动包 activemq安装目录/lib
1

image-20220730182808512

配置文件改成数据库

打开 conf 下的 activemq.xml 文件,进行 jdbcPersistenceAdapter 配置。

将之前的配置注释掉,替换为 jdbc 的配置。如下:.

<!-- 
<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
</persistenceAdapter>
1
2
3
4
5
6
7
8

dataSource 指定将要引用的持久化数据库的 bean 名称,createTablesOnStartup 是否在启动的时候创建数据表,默认值是 true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为 true 之后改成 false。

数据库连接池配置

进入数据库,创建名为 activemq 的数据库(可自定义名字,如果自定义名字,下方的代码也稍微修改)

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://自己的数据库IP:3306/activemq?relaxAutoCommit=true"/> 
    <property name="username" value="数据库用户名"/> 
    <property name="password" value="数据库密码"/> 
    <property name="poolPreparedStatements" value="true"/> 
</bean>
1
2
3
4
5
6
7

在 </broker> 标签和 <import> 标签之间插入上面的数据库连接池配置

image-20220730183423943

启动前,先去数据库建数据库,名为 activemq,默认是的 dbcp 数据库连接池,如果要换成其他数据库连接池,需要将该连接池 jar 包,也放到 lib 目录下。

# 建库 SQL 和创表说明

重启 ActiveMQ。会自动生成如下 3 张表。如果没有自动生成,需要我们手动执行 SQL。我个人建议要自动生成,我在操作过程中查看日志文件,发现了不少问题,最终解决了这些问题后,是能够自动生成的。如果不能自动生成说明你的操作有问题。如果实在不行,下面是手动建表的 SQL:

-- auto-generated definition
create table ACTIVEMQ_ACKS
(
    CONTAINER     varchar(250)     not null comment '消息的Destination',
    SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
    CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
    SUB_NAME      varchar(250)     not null comment '订阅者名称',
    SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
    LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',
    PRIORITY      bigint default 5 not null comment '优先级,默认5',
    XID           varchar(250)     null,
    primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';

create index ACTIVEMQ_ACKS_XIDX
on ACTIVEMQ_ACKS (XID);


-- auto-generated definition
create table ACTIVEMQ_LOCK
(
    ID          bigint       not null
    primary key,
    TIME        bigint       null,
    BROKER_NAME varchar(250) null
);


-- auto-generated definition
create table ACTIVEMQ_MSGS
(
    ID         bigint       not null
    primary key,
    CONTAINER  varchar(250) not null,
    MSGID_PROD varchar(250) null,
    MSGID_SEQ  bigint       null,
    EXPIRATION bigint       null,
    MSG        blob         null,
    PRIORITY   bigint       null,
    XID        varchar(250) null
);

create index ACTIVEMQ_MSGS_CIDX
on ACTIVEMQ_MSGS (CONTAINER);

create index ACTIVEMQ_MSGS_EIDX
on ACTIVEMQ_MSGS (EXPIRATION);

create index ACTIVEMQ_MSGS_MIDX
on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);

create index ACTIVEMQ_MSGS_PIDX
on ACTIVEMQ_MSGS (PRIORITY);

create index ACTIVEMQ_MSGS_XIDX
on ACTIVEMQ_MSGS (XID);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

SQL 语句创建的各个表字段说明

ACTIVEMQ_MSGS 数据表:

  • ID:自增的数据库主键
  • CONTAINER:消息的 Destination
  • MSGID_PROD:消息发送者的主键
  • MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ 可以组成 JMS 的 MessageID
  • EXPIRATION:消息的过期时间,存储的是从 1970-01-01 到现在的毫秒数
  • MSG:消息本体的 Java 序列化对象的二进制数据
  • PRIORITY:优先级,从 0-9,数值越大优先级越高

ACTIVEMQ_ACKS 数据表用于存储订阅关系。即存储持久订阅的信息和最后一个持久订阅接收的消息 ID,如果是持久化 Topic,订阅者和服务器的订阅关系在这个表保存。数据库字段如下:

  • CONTAINER:消息的 Destination
  • SUB_DEST:如果是使用 Static 集群,这个字段会有集群其他系统的信息
  • CLIENT_ID:每个订阅者都必须有一个唯一的客户端 ID 用以区分
  • SUB_NAME:订阅者名称
  • SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性 AND 和 OR 操作
  • LAST_ACKED_ID:记录消费过的消息的 ID

ACTIVEMQ_LOCK 数据表在集群环境中才有用,只有一个 Broker 可以获得消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用于记录哪个 Broker 是当前的 Master Broker。

# Queue 验证和数据表变化

在点对点类型中

  • 当 DeliveryMode 设置为 NON_PERSISTENCE 时,消息被保存在内存中
  • 当 DeliveryMode 设置为 PERSISTENCE 时,消息保存在 Broker 的相应的文件或者数据库中
  • 而且点对点类型中消息一旦被 Consumer 消费就从 Broker 中删除

无论是 Queue 还是 Topic ,都要记得开启持久化,否则上面的配置将没有效果:

producer.setDeliveryMode(DeliveryMode.PERSISTENT);
1

Queue 模式,非持久化不会将消息持久化到数据库。

Queue 模式,持久化会将消息持久化数据库。

我们使用 Queue 模式持久化,发布 3 条消息后,发现 ACTIVEMQ_MSGS 数据表多了 3 条数据

image-20220730184212411

启动消费者,消费了所有的消息后,发现数据表的数据消失了

image-20220730184220261

# Topic 验证和说明

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

// 持久化topic 的消息消费者
public class JmsConsummer_persistence {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "jdbc-02";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("marrry");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
        connection.start();
        Message message = topicSubscriber.receive();
        while (null != message){
            TextMessage textMessage = (TextMessage)message;
            System.out.println(" 收到的持久化 topic :"+textMessage.getText());
            message = topicSubscriber.receive();
        }
        session.close();
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

启动持久化 Topic 的消费者。看到 ACTIVEMQ_ACKS 数据表多了一个消费者的身份信息。一条记录代表:一个持久化 Topic 消费者。

img

我们启动持久化生产者发布 3 个数据,ACTIVEMQ_MSGS 数据表新增 3 条数据,消费者消费所有的数据后,ACTIVEMQ_MSGS 数据表的数据并没有消失。持久化 Topic 的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。这个是要注意的,持久化的 Topic 大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。

img

# 总结

如果是 Queue,在没有消费者消费的情况下,会将消息保存到 activemq_msgs 表中,只有有任意一个消费者已经消费了,消费后的消息就会立即被删除。

如果是 Topic,一般都是先启动消费订阅然后再生产的情况下会将消息保存到 activemq_acks。

# 遇到的坑

数据库 jar 包

记得需要使用到的相关 jar 文件放置到 ActiveMQ 安装路径下的 lib 目录 mysql-jdbc 驱动的 jar 包和对应的数据库连接池 jar 包。

createTablesOnStartup 属性

在 jdbcPersistenceAdapter 标签中设置了 createTablesOnStartup 属性为 true 时,在第一次启动 ActiveMQ,ActiveMQ 服务节点会自动创建所需要的数据表。启动完成后可以去掉这个属性,或者更改 createTablesOnStartup 属性为 false。

下滑线坑:java.lang.lllegalStateException:BeanFactory not initialized or already closed

这是因为您的操作系统的机器名中有 _ 符号。请更改机器名并且重启后即可解决问题。

img

# JDBC Message Store with ActiveMQ Journal

# 说明

这种方式克服了 JDBC Store 的不足,JDBC 每次消息过来,都需要去写库读库。ActiveMQ Journal 使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal 文件能够大大减少需要写入到 DB 中的消息。

举个例子:生产者生产了 1000 条消息,这 1000 条消息会保存到 journal 文件,如果消费者的消费速度很快的情况下,在 journal 文件还没有同步到 DB 之前,消费者已经消费了 90% 的以上消息,那么这个时候只需要同步剩余的 10% 的消息到 DB。如果消费者的速度很慢,这个时候 journal 文件可以使消息以批量方式写到 DB。

为了高性能,这种方式使用日志文件存储 + 数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比 JDBC 性能要高。

# 配置

下面是基于上面 JDBC 配置,再做一点修改:

在 /myactiveMQ/apache-activemq-5.15.9/conf 路径下修改 activemq.xml 配置文件,按照如下修改:

<!-- 
<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
</persistenceAdapter>
-->

<persistenceFactory>
    <journalPersistenceAdapterFactory
		journalLogFiles="4"
		journalLogFileSize="32768"useJournal="true"
		useQuickJournal="true"dataSource="#mysql-ds"
		dataDirectory="activemq-data" />
</persistenceFactory>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

就是将其他持久化机制的配置注释掉,然后加上 JDBC Message Store with ActiveMQ Journal 的配置。

# 总结

JDBC 效率低,kahaDB 效率高,JDBC + Journal 效率较高。

持久化消息主要指的是:MQ 所在服务器宕机了消息不会丢试的机制。

持久化机制演变的过程

从最初的 AMQ Message Store 方案到 ActiveMQ V4 版本退出的 High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ 5.3 版本又推出了对 KahaDB 的支持(5.4 版本后被作为默认的持久化方案),后来 ActiveMQ 5.8 版本开始支持 LevelDB,到现在 5.9 提供了标准的 Zookeeper + LevelDB 集群化方案。

AMQ 基于普通文件
KahaDB 基于日志文件,从 ActiveMQ 5.4 开始默认使用
JDBC 基于第三方数据库
Replicated LevelDB Store 从 5.9 开始提供了 LevelDB 和 Zookeeper 的数据复制方法,用于 Master-slave 方式的首选数据复制方案
编辑此页 (opens new window)
#中间件
更新时间: 2023/10/23, 10:58:52
ActiveMQ - 传输协议
ActiveMQ - 高级特性

← ActiveMQ - 传输协议 ActiveMQ - 高级特性→

最近更新
01
技术随笔 - Element Plus 修改包名 原创
11-02
02
Reactor - 扩展性
11-02
03
Reactor - 最佳实践
11-02
更多文章>
Theme by Vdoing | Copyright © 2021-2024 Young Kbt | blog
桂ICP备2021009994号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式