ElasticSearch - 多框架集成
# Spring Data框架集成
Spring Data 是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce 框架和云计 算数据服务。Spring Data 可以极大的简化 JPA(Elasticsearch ......)的写法,可以在几乎不用写实现的情况下,实现对数据的访问和操作。除了 CRUD 外,还包括如分页、排序等一些常用的功能。
Spring Data 的官网:https://spring.io/projects/spring-data (opens new window)
Spring Data 常用的功能模块如下:
# Spring Data Elasticsearch介绍
Spring Data Elasticsearch 基于 spring data API 简化 Elasticsearch 操作,将原始操作 Elasticsearch 的客户端 API 进行封装。Spring Data 为 Elasticsearch 项目提供集成搜索引擎。Spring Data Elasticsearch POJO 的关键功能区域为中心的模型与 Elastichsearch 交互文档和轻松地编写一个存储索引库数据访问层。
官方网站: https://spring.io/projects/spring-data-elasticsearch (opens new window)
# Spring Data Elasticsearch版本对比
选择兼容的版本非常重要,我经常因为版本冲突而花费了大量时间调试。
版本对比以官网为主,查看地址:https://docs.spring.io/spring-data/elasticsearch/docs/4.3.0/reference/html/#preface.requirements (opens new window)
我的 Elasticsearch 版本是 7.15.2,所以所有版本选择如下:
# 框架搭建
创建一个 Maven 项目
修改 pom.xml 文件,添加依赖
<!-- Spring Boot --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.5</version> <relativePath/> </parent> <dependencies> <!-- Spring data ES 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20因为利用 Test 进行测试,所以不需要加入 Web 等依赖,请根据需求自行添加。
添加配置文件,在 resources 目录中增加 application.yml 文件
spring: application: name: es-service
1
2
3注意
新版的 ip 地址和端口号可以在「配置类」里进行配置,无需在配置文件进行配置
2021-11-17 @Young Kbt
旧版的配置文件:
spring: application: name: es-service data: elasticsearch: cluster-nodes: 127.0.0.1:9200 # ES 服务器位置
1
2
3
4
5
6Spring Boot 启动类
package com.kbt; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author Young Kbt * @date 2021/11/17 17:24 * @description 启动类 */ @SpringBootApplication public class EsApplication { public static void main(String[] args) { SpringApplication.run(EsApplication.class, args); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16添加 ES 配置类,进行地址和端口的配置
package com.kbt.config; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; /** * @author Young Kbt * @date 2021/11/17 17:25 * @description ElasticSearch 配置类 */ @Configuration public class ElasticSearchConfig extends AbstractElasticsearchConfiguration { @Override @Bean public RestHighLevelClient elasticsearchClient() { RestClientBuilder builder = RestClient.builder(new HttpHost("127.0.0.1", 9200,"http")); return new RestHighLevelClient(builder); // 如果集群有多个地址 // HttpHost[] httpHosts = new HttpHost[]{ // new HttpHost("127.2.0.1", 9200,"http"), // new HttpHost("127.2.0.2", 9200,"http"), // new HttpHost("127.2.0.3", 9200,"http")}; // RestClientBuilder builder = RestClient.builder(httpHosts); // return new RestHighLevelClient(builder); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package com.kbt.config; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; /** * @author Young Kbt * @date 2021/11/17 17:25 * @description ElasticSearch 配置类,来源官网 */ @Configuration public class ElasticSearchConfig extends AbstractElasticsearchConfiguration { @Override @Bean public RestHighLevelClient elasticsearchClient() { final ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo("localhost:9200") // 多个地址用逗号隔开 .build(); return RestClients.create(clientConfiguration).rest(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// Make sure to add code blocks to your code group
我这个版本使用最新模板,启动失败,会报错,只能使用原始模板,建议能用最新模板就用它,因为这是官方推荐。
实体类
Spring Data ES支持两种实体映射方案:
- Jackson Object Mapping
- Meta Model Object Mapping
早期的版本默认使用的是 jackson 的方案,但是在
4.x
之后 Meta Model 就上位了,而前者已经不再被支持。所以这里使用第二种方案。我们先定义一个实体类,并通过注解来表明它跟 ES 实体之间的映射关系。package com.kbt.pojo; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import org.springframework.data.elasticsearch.annotations.Setting; /** * @author Young Kbt * @date 2021/11/17 17:26 * @description 商品实体类 */ @Document(indexName = "shopping") @Setting(shards = 3, replicas = 1) public class Product { @Id private Integer id; // 商品唯一标识 /** * type: 字段数据类型 * analyzer: 分词器类型 * index: 是否索引(默认:true) * Keyword: 短语,不进行分词 */ @Field(name = "title",type = FieldType.Text) private String title; // 商品名称 @Field(type = FieldType.Keyword) private String category; // 分类名称 @Field(type = FieldType.Double) private Double price; // 商品价格 @Field(type = FieldType.Keyword, index = false) private String images; // 图片地址 public Product() { } // set // get // toString }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40索引叫做
shopping
。请加入 Set 和 Get 和 toString 方法,toString 方法是为了测试期间直接打印在控制台。
类注解:@Document 是文档注解,@Setting 是 ES 配置注解
注解 Elasticsearch 映射中的格式字符串 indexName 索引名称 type 索引类型 shards 分片的数量,默认为 1 replicas 副本的数量,默认为 1 refreshInterval 刷新间隔,默认为 1 s indexStoreType 索引文件存储类型,默认为 fs 属性注解:@Field 用来指定属性的类型
注解 Elasticsearch 映射中的格式字符串 analyzer 指定分词器,Es 中默认使用的标准分词器,比如我们需要指定中文 IK 分词器,可以指定值为 ik_max_word name 字段名称,可以在 ES 文档中表示,若未设置,则使用 Java 字段名称。 index 指定该词是否需要索引,默认为 true type 指定该属性在 ES 中的类型,其中的值是 FileType 类型的值,比如 FileType.Text 类型对应 ES 中的 text 类型 store 指定该属性内容是否存储在 ES,默认为 false fielddata 指定该属性能否进行排序,因为 ES 中的 text 类型是不能进行排序(已经分词了) searchAnalyzer 指定搜索使用的分词器 日期格式映射
注解 Elasticsearch 映射中的格式字符串 @Field(type=FieldType.Date) date_optional_timeepoch_millis @Field(type=FieldType.Date, format=DateFormat.basic_date) 基本日期 @Field(type=FieldType.Date, format={DateFormat.basic_date, DateFormat.basic_time}) 基本日期 || 基本时间 @Field(type=FieldType.Date, pattern="dd.MM.uuuu") date_optional_time || epoch_millis || dd.MM.uuu @Field(type=FieldType.Date, format={}, pattern="dd.MM.uuuu") dd.MM.uuuu
# 封装类测试
这里先演示使用 ES 封装类进行测试,比较复杂,后面还有更加简单的Repository类原生测试
Spring Data Elasticsearch 中,可以使用 SearchOperations 工具执行一些更复杂的查询,这些查询操作接收一个 Query 对象封装的查询操作。
Spring Data Elasticsearch 中的 Query 有三种:
- CriteriaQuery
- StringQuery
- NativeSearchQuery
NativeSearchQuery 可以满足我们的查询,如果是字段查询或者范围查询,需要用到 CriteriaQuery。
插入数据时,如果索引不存在,自动会创建索引
package com.kbt;
import com.kbt.pojo.Product;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchOperations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import java.util.List;
/**
* @author Young Kbt
* @date 2021/11/17 19:59
* @description ES 商品测试类
*/
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 插入数据
*/
@Test
public void save() {
Product product = new Product();
product.setId(1);
product.setTitle("华为手机1");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("https://cdn.jsdelivr.net/gh/Kele-Bingtang/static/user/avatar2.png");
// indexCoordinates 是 4.x 新增的一个参数,通过这个参数我们可以再操作ES的时候同时指定多个 index。
IndexCoordinates indexCoordinatesFor = esOperations.getIndexCoordinatesFor(product.getClass());
// 为索引设置数据
IndexQuery indexQuery = new IndexQueryBuilder()
.withId(product.getId().toString())
.withObject(product).build();
// 插入索引和其数据,返回索引值
String index = esOperations.index(indexQuery, indexCoordinatesFor);
System.out.println("插入的索引:" + index);
}
/**
* 查询全部数据
*/
@Test
public void queryAll() {
// 查询所有数据
MatchAllQueryBuilder allQueryBuilder = new MatchAllQueryBuilder();
// 查询参数配置
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(allQueryBuilder)
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
// 获取查询的结果命中总数
long count = searchHits.getTotalHits();
System.out.println("查询的总数:" + count);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
/**
* 词条查询
*/
@Test
public void termQuery() {
// 指定名字查询
TermQueryBuilder termQueryBuilder = new TermQueryBuilder("category", "手机");
// 查询参数配置
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(termQueryBuilder)
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
// 获取查询的结果命中总数
long count = searchHits.getTotalHits();
System.out.println("查询的总数:" + count);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
/**
* 分页排序查询
*/
@Test
public void sortPageQuery() {
// 查询所有数据
MatchAllQueryBuilder allQueryBuilder = new MatchAllQueryBuilder();
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage = 0;//当前页,第一页从 0 开始,1 表示第二页
int pageSize = 3;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
// 查询参数配置
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(allQueryBuilder)
.withPageable(pageRequest)
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
/**
* 高亮查询
*/
@Test
public void highLightQuery() {
// 查询所有数据
MatchAllQueryBuilder allQueryBuilder = new MatchAllQueryBuilder();
// 查询参数配置
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(allQueryBuilder)
.withHighlightBuilder(new HighlightBuilder().field("price").preTags("<前缀>").postTags("</后缀>"))
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
/**
* 范围查询
*/
@Test
public void searchByPrice() {
Criteria criteria = new Criteria("price").between(2000.0, 2500.0);
CriteriaQuery criteriaQuery = new CriteriaQuery(criteria);
SearchHits<Product> search = searchOperations.search(criteriaQuery, Product.class);
List<SearchHit<Product>> searchHits = search.getSearchHits();
for (SearchHit<Product> searchHit : searchHits) {
System.out.println(searchHit);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 插入数据
*/
@Test
public void save() {
Product product = new Product();
product.setId(1);
product.setTitle("华为手机1");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("https://cdn.jsdelivr.net/gh/Kele-Bingtang/static/user/avatar2.png");
// indexCoordinates 是 4.x 新增的一个参数,通过这个参数我们可以再操作ES的时候同时指定多个 index。
IndexCoordinates indexCoordinatesFor = esOperations.getIndexCoordinatesFor(product.getClass());
// 为索引设置数据
IndexQuery indexQuery = new IndexQueryBuilder()
.withId(product.getId().toString())
.withObject(product).build();
// 插入索引和其数据,返回索引值
String index = esOperations.index(indexQuery, indexCoordinatesFor);
System.out.println("插入的索引:" + index);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 查询全部数据
*/
@Test
public void queryAll() {
// 查询所有数据
MatchAllQueryBuilder allQueryBuilder = new MatchAllQueryBuilder();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(allQueryBuilder)
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
// 获取查询的结果命中总数
long count = searchHits.getTotalHits();
System.out.println("查询的总数:" + count);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 词条查询
*/
@Test
public void termQuery() {
// 指定名字查询
TermQueryBuilder termQueryBuilder = new TermQueryBuilder("category", "手机");
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(termQueryBuilder)
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
// 获取查询的结果命中总数
long count = searchHits.getTotalHits();
System.out.println("查询的总数:" + count);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 分页排序查询
*/
@Test
public void sortPageQuery() {
// 查询所有数据
MatchAllQueryBuilder allQueryBuilder = new MatchAllQueryBuilder();
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage = 0;//当前页,第一页从 0 开始,1 表示第二页
int pageSize = 3;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(allQueryBuilder)
.withPageable(pageRequest)
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 高亮查询
*/
@Test
public void highLightQuery() {
// 查询所有数据
MatchAllQueryBuilder allQueryBuilder = new MatchAllQueryBuilder();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(allQueryBuilder)
.withHighlightBuilder(new HighlightBuilder().field("price").preTags("<前缀>").postTags("</后缀>"))
.build();
SearchHits<Product> searchHits = searchOperations.search(searchQuery, Product.class);
List<SearchHit<Product>> list = searchHits.getSearchHits();
System.out.println("查询的数据:");
// 遍历查询的数据
for (SearchHit<Product> hit : list) {
System.out.println(hit);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 范围查询
*/
@Test
public void searchByPrice() {
Criteria criteria = new Criteria("price");
CriteriaQuery criteriaQuery = new CriteriaQuery(criteria);
SearchHits<Product> search = searchOperations.search(criteriaQuery, Product.class);
List<SearchHit<Product>> searchHits = search.getSearchHits();
for (SearchHit<Product> searchHit : searchHits) {
System.out.println(searchHit);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootTest
public class ESProductTest {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private SearchOperations searchOperations;
/**
* 范围查询
*/
@Test
public void searchByPrice() {
Criteria criteria = new Criteria("price").between(2000.0, 2500.0);
CriteriaQuery criteriaQuery = new CriteriaQuery(criteria);
SearchHits<Product> search = searchOperations.search(criteriaQuery, Product.class);
List<SearchHit<Product>> searchHits = search.getSearchHits();
for (SearchHit<Product> searchHit : searchHits) {
System.out.println(searchHit);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Make sure to add code blocks to your code group
由代码可以看出:
NativeSearchQueryBuilder
类可以处理很多东西,包括查询、分页、高亮、排序等,它拥有丰富的withXxx
方法,基本满足关于 ES 的操作- 如果是范围查询,需要用到
Criteria
类进行范围设置,当然该类也可以直接查询某个字段,不带范围的查询就是字段查询
解释:
ElasticsearchOperations
是 Spring Data ES 操作 ES 的一个接口,在 4.x 的版本它的默认实现类是ElasticsearchRestTemplate
,你也可以直接使用ElasticsearchRestTemplate
替换掉ElasticsearchOperations
,因为接口和其实现类都一样,使用接口可以实现多态ElasticsearchOperations
继承SearchOperations
,SearchOperations
内的方法趋向于搜索,ElasticsearchOperations
因为继承,所以拥有一些SearchOperations
的搜索方法,自身也有一些索引的操作方法。这里我分开使用
番外:
- ElasticsearchRestTemplate 是 Spring-Data-Elasticsearch 项目中的一个类,和其他 Spring 项目中的 Template 类似
- 选择 ElasticsearchRestTemplate 代替了原来的 ElasticsearchTemplate,原因是 ElasticsearchTemplate 基于 TransportClient,TransportClient 即将在 8.x 以后的版本中移除
- ElasticsearchRestTemplate 基于 RestHighLevelClient 客户端的。需要自定义配置类,继承 AbstractElasticsearchConfiguration,并实现 elasticsearchClient() 抽象方法,创建 RestHighLevelClient 对象
# Repository类原生测试
用过 Spring Data 的人都知道,它里面有个核心的概念叫 Repository
。
Repository 接口是 Spring Data 的一个核心接口,它不提供任何方法,开发者需要在自己定义的接口中声明需要的方法
public interface Repository<T, ID extends Serializable> { }
Spring Data 可以让我们只定义接口,只要遵循 Spring Data 的规范,就无需写实现类。Repository 有几个常用的子接口:
- CrudRepository: 继承 Repository,实现了一组 CRUD 相关的方法
- PagingAndSortingRepository: 继承 CrudRepository,实现了一组分页排序相关的方法
- JpaRepository: 继承 PagingAndSortingRepository,实现一组 JPA 规范相关的方法
ElasticSearch 也有一个 Repository
接口:ElasticsearchRepository
,该接口已经内置一些基本的增删改查方法,我们只需要自定义一个类,继承它,就可以使用方法了,当然,它只能进行一些简单的操作,复杂的操作需要自定义实现,也不难,因为官网提供了规范模板。
创建一个类继承 ElasticsearchRepository
接口
package com.kbt.repository;
import com.kbt.pojo.Product;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import java.util.List;
/**
* @author Young Kbt
* @date 2021/11/17 18:49
* @description 基础 ES 封装类,自带很多方法
* Product:索引的实体类
* Integer:主键 ID 类型
*/
public interface ProductRepository extends ElasticsearchRepository<Product,Integer> {
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
该接口是一个泛型,第一个参数是索引的实体类,第二个参数是主键 Id 的类型,用过 mybatis-plus 应该非常熟悉接下来的操作。
测试代码:
package com.kbt;
import com.kbt.repository.ProductRepository;
import com.kbt.pojo.Product;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import java.util.ArrayList;
import java.util.List;
/**
* @author Young Kbt
* @date 2021/11/17 18:51
* @description ES Repository 测试类
*/
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 新增
*/
@Test
public void save(){
Product product = new Product();
product.setId(1);
product.setTitle("华为手机");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("https://cdn.jsdelivr.net/gh/Kele-Bingtang/static/user/avatar2.png");
productRepository.save(product);
}
/**
* 修改
*/
@Test
public void update(){
Product product = new Product();
product.setId(1);
product.setTitle("小米 2 手机");
product.setCategory("手机");
product.setPrice(9999.0);
product.setImages("https://cdn.jsdelivr.net/gh/Kele-Bingtang/static/user/avatar2.png");
productRepository.save(product);
}
/**
* 根据 id 查询
*/
@Test
public void findById(){
Product product = productRepository.findById(1).get();
System.out.println(product);
}
/**
* 查询所有
*/
@Test
public void findAll(){
Iterable<Product> products = productRepository.findAll();
for (Product product : products) {
System.out.println(product);
}
}
/**
* 删除
*/
@Test
public void delete(){
Product product = new Product();
product.setId(1);
productRepository.delete(product);
}
/**
* 批量新增
*/
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(i);
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0+i);
product.setImages("http://www.youngkbt/xm.jpg");
productList.add(product);
}
productRepository.saveAll(productList);
}
/**
* 分页查询
*/
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage = 0; //当前页,第一页从 0 开始,1 表示第二页
int pageSize = 5; //每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productRepository.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 新增数据
*/
@Test
public void save(){
Product product = new Product();
product.setId(1);
product.setTitle("华为手机");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("https://cdn.jsdelivr.net/gh/Kele-Bingtang/static/user/avatar2.png");
productRepository.save(product);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 修改数据
*/
@Test
public void update(){
Product product = new Product();
product.setId(1);
product.setTitle("小米 2 手机");
product.setCategory("手机");
product.setPrice(9999.0);
product.setImages("https://cdn.jsdelivr.net/gh/Kele-Bingtang/static/user/avatar2.png");
productRepository.save(product);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 根据 id 查询数据
*/
@Test
public void findById(){
Product product = productRepository.findById(1).get();
System.out.println(product);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 查询所有数据
*/
@Test
public void findAll(){
Iterable<Product> products = productRepository.findAll();
for (Product product : products) {
System.out.println(product);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 删除数据
*/
@Test
public void delete(){
Product product = new Product();
product.setId(1);
productRepository.delete(product);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 批量新增数据
*/
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(i);
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0+i);
product.setImages("http://www.youngkbt/xm.jpg");
productList.add(product);
}
productRepository.saveAll(productList);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@SpringBootTest
public class ESProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
/**
* 分页查询数据
*/
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage = 0; //当前页,第一页从 0 开始,1 表示第二页
int pageSize = 5; //每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productRepository.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Make sure to add code blocks to your code group
# Repository类自定义测试
Spring Data 的 Repository 接口提供了一种声明式的数据操作规范,无序编写任何代码,只需遵循 Spring Data 的方法定义规范即可完成数据的操作。
ES 的 Repository 只有基本的增删改查操作,没有聚合查询,所以需要使用官方提供的自定义方法。并且官方有命名规范
自定义数据操作方法需要遵循 Repository 规范,示例如下:
关键词 | 方法名 | Elasticsearch 查询字符串 |
---|---|---|
And | findByNameAndPrice | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "?", "fields" : [ "name" ] } }, { "query_string" : { "query" : "?", "fields" : [ "price" ] } } ] } }} |
Or | findByNameOrPrice | { "query" : { "bool" : { "should" : [ { "query_string" : { "query" : "?", "fields" : [ "name" ] } }, { "query_string" : { "query" : "?", "fields" : [ "price" ] } } ] } }} |
Is | findByName | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "?", "fields" : [ "name" ] } } ] } }} |
Not | findByNameNot | { "query" : { "bool" : { "must_not" : [ { "query_string" : { "query" : "?", "fields" : [ "name" ] } } ] } }} |
Between | findByPriceBetween | { "query" : { "bool" : { "must" : [ {"range" : {"price" : {"from" : ?, "to" : ?, "include_lower" : true, "include_upper" : true } } } ] } }} |
LessThan | findByPriceLessThan | { "query" : { "bool" : { "must" : [ {"range" : {"price" : {"from" : null, "to" : ?, "include_lower" : true, "include_upper" : false } } } ] } }} |
LessThanEqual | findByPriceLessThanEqual | { "query" : { "bool" : { "must" : [ {"range" : {"price" : {"from" : null, "to" : ?, "include_lower" : true, "include_upper" : true } } } ] } }} |
GreaterThan | findByPriceGreaterThan | { "query" : { "bool" : { "must" : [ {"range" : {"price" : {"from" : ?, "to" : null, "include_lower" : false, "include_upper" : true } } } ] } }} |
GreaterThanEqual | findByPriceGreaterThan | { "query" : { "bool" : { "must" : [ {"range" : {"price" : {"from" : ?, "to" : null, "include_lower" : true, "include_upper" : true } } } ] } }} |
Before | findByPriceBefore | { "query" : { "bool" : { "must" : [ {"range" : {"price" : {"from" : null, "to" : ?, "include_lower" : true, "include_upper" : true } } } ] } }} |
After | findByPriceAfter | { "query" : { "bool" : { "must" : [ {"range" : {"price" : {"from" : ?, "to" : null, "include_lower" : true, "include_upper" : true } } } ] } }} |
Like | findByNameLike | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "?*", "fields" : [ "name" ] }, "analyze_wildcard": true } ] } }} |
StartingWith | findByNameStartingWith | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "?*", "fields" : [ "name" ] }, "analyze_wildcard": true } ] } }} |
EndingWith | findByNameEndingWith | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "*?", "fields" : [ "name" ] }, "analyze_wildcard": true } ] } }} |
Contains/Containing | findByNameContaining | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "?", "fields" : [ "name" ] }, "analyze_wildcard": true } ] } }} |
In | findByNameIn(Collectionnames) | { "query" : { "bool" : { "must" : [ {"bool" : {"must" : [ {"terms" : {"name" : ["?","?"]}} ] } } ] } }} |
NotIn | findByNameNotIn(Collectionnames) | { "query" : { "bool" : { "must" : [ {"bool" : {"must_not" : [ {"terms" : {"name" : ["?","?"]}} ] } } ] } }} |
Near | findByStoreNear | Not Supported Yet !(不支持了) |
True | findByAvailableTrue | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "true", "fields" : [ "available" ] } } ] } }} |
False | findByAvailableFalse | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "false", "fields" : [ "available" ] } } ] } }} |
OrderBy | findByAvailableTrueOrderByNameDesc | { "query" : { "bool" : { "must" : [ { "query_string" : { "query" : "true", "fields" : [ "available" ] } } ] } }, "sort":[{"name":{"order":"desc"}}] } |
看不懂什么意思吗?下面来看看示例,然后回头看表格就明白了。
需求 1:不想通过 id 查询 数据,想通过其他字段如 category 查询数据
需求2:既想查询商品分类名 category,又想查询商品价格 price 的数据
首先在自定义的 Repository
类代码按照规范写代码:
public interface ProductRepository extends ElasticsearchRepository<Product,Integer> {
// 需求 1
List<Product> findProductByCategory(String category);
// 需求 2
List<Product> findProductByCategoryAndPrice(String category, Double price);
}
2
3
4
5
6
可以看出根据规范,有 findXxxByXxx、findXxxByXxxAndXxx、findXxxByXxxOrXxx等格式,其实用 Ideal 工具,输入 find,它就会弹出很多的提示信息。
笔记
只需要写接口方法,不需要写这些方法实现,因为 ES 已经写好了,只不过没有放在 ElasticsearchRepository
接口,只需要我们自定义写出来。
2021-11-17 @Young Kbt
测试类:
package com.kbt;
import com.kbt.repository.ProductRepository;
import com.kbt.pojo.Product;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
/**
* @author Young Kbt
* @date 2021/11/17 19:07
* @description ES 自定义 Repository 测试类
*/
@SpringBootTest
public class ESSearchTest {
@Autowired
private ProductRepository productRepository;
/**
* 通过某个字段查询数据
*/
@Test
public void docQuery(){
List<Product> productList = productRepository.findProductByCategory("手机");
for (Product product : productList) {
System.out.println(product);
}
}
/**
* term 查询
*/
@Test
public void termQuery(){
List<Product> productList = productRepository.findProductByCategoryAndPrice("手机", 2004.0);
for (Product p : productList) {
System.out.println(p);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 总结
封装类的逻辑更趋向于原生 API,需要过硬的逻辑步骤去完成 ES 操作。
Repository 类完全依赖框架,简单的代码就可以实现丰富的操作,但是没有过多的逻辑,容易变成 CURD 工程师,即根本不知道内部的逻辑和原理。
# 案例分析
DDL
按照如下的 DDL 语句编写代码:
GET /office/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"skuTitle": "华为"
}
}
],
"filter": [
{
"term": {
"catalogId": "120"
}
},
{
"terms": {
"branchId": [
"120",
"112"
]
}
},
{
"nested": {
"path": "attrs",
"query": {
"bool": {
"must": [
{
"term": {
"attrs.attrId": "15"
}
},
{
"terms": {
"attrs.attrValue": [
"华为",
"海思"
]
}
}
]
}
}
}
},
{
"term": {
"hasStock": "0"
}
},
{
"range": {
"skiPrice": {
"gte": 10,
"lte": 20
}
}
}
]
}
},
"aggs": {
"brand_agg": {
"terms": {
"field": "brandId",
"size": 10
},
"aggs": {
"brand_name_agg": {
"terms": {
"field": "brandName",
"size": 10
}
},
"brand_img_agg": {
"terms": {
"field": "brandImg",
"size": 10
}
}
}
},
"catalog_agg": {
"terms": {
"field": "catalogId",
"size": 10
},
"aggs": {
"catalog_name_agg": {
"terms": {
"field": "catalogName",
"size": 10
}
}
}
},
"attr_agg": {
"nested": {
"path": "attrs"
},
"aggs": {
"attr_id_agg": {
"terms": {
"field": "attrs.attr_id",
"size": 10
},
"aggs": {
"attr_name_agg": {
"terms": {
"field": "attrs.attrName",
"size": 10
}
},
"attr_value_agg": {
"terms": {
"field": "attrs.attrValue",
"size": 10
}
}
}
}
}
}
},
"sort": [
{
"age": {
"order": "desc"
}
}
],
"highlight": {
"pre_tags": "<b style='color: red'>",
"post_tags": "</b>",
"fields": {
"skuTitle": {}
}
},
"from": 0,
"size": 1
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
Java 代码:
public void search() {
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
/**
* 查询
*/
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("skuTitle", "华为"));
boolQueryBuilder.filter(QueryBuilders.termQuery("catalogId", "120"));
boolQueryBuilder.filter(QueryBuilders.termsQuery("brandId", "120", "112"));
// nest 查询
BoolQueryBuilder nestBoolQuery = QueryBuilders.boolQuery();
nestBoolQuery.must(QueryBuilders.termQuery("attrs.attrId", "18"));
nestBoolQuery.must(QueryBuilders.termsQuery("attrs.attrValue", "华为", "海思"));
boolQueryBuilder.filter(QueryBuilders.nestedQuery("attr", nestBoolQuery, ScoreMode.None));
boolQueryBuilder.filter(QueryBuilders.termQuery("hashStock", "0"));
boolQueryBuilder.filter(QueryBuilders.rangeQuery("skuPrice").gte(10).lt(20));
nativeSearchQueryBuilder.withQuery(boolQueryBuilder);
/**
* 排序、分页、高亮
*/
nativeSearchQueryBuilder.withSorts(new FieldSortBuilder("age").order(SortOrder.DESC));
nativeSearchQueryBuilder.withPageable(PageRequest.of(0, 20));
nativeSearchQueryBuilder.withHighlightBuilder(new HighlightBuilder().field("skuTitle").preTags("<b style='color: red'>").postTags("</b>"));
/**
* 聚合
*/
TermsAggregationBuilder brandAgg = AggregationBuilders.terms("brand_agg").field("brandId").size(10);
TermsAggregationBuilder brandName = AggregationBuilders.terms("brand_name_agg").field("brandName").size(1010);
TermsAggregationBuilder brandImg = AggregationBuilders.terms("brand_img_agg").field("brandImg").size(10);
// brandAgg 子聚合
brandAgg.subAggregation(brandName).subAggregation(brandImg);
TermsAggregationBuilder catalogId = AggregationBuilders.terms("catalog_agg").field("catalogId").size(10);
TermsAggregationBuilder catalogName = AggregationBuilders.terms("brand_name_agg").field("catalogName").size(10);
catalogId.subAggregation(catalogName);
// nest 聚合
NestedAggregationBuilder nestedAgg = AggregationBuilders.nested("attr_agg", "attrs");
TermsAggregationBuilder attrId = AggregationBuilders.terms("attr_id_agg").field("attrs.attrId").size(10);
TermsAggregationBuilder attrName = AggregationBuilders.terms("attr_name_agg").field("attrs.attrName").size(10);
TermsAggregationBuilder attrValue = AggregationBuilders.terms("attr_value_agg").field("attrs.attrValue").size(10);
attrId.subAggregation(attrName).subAggregation(attrValue);
nestedAgg.subAggregation(attrId);
nativeSearchQueryBuilder.withAggregations(brandAgg, catalogId, nestedAgg);
SearchHits<Example> search = elasticsearchOperations.search(nativeSearchQueryBuilder.build(), Example.class);
System.out.println("search:" + search);
for (SearchHit<Example> searchHit : search.getSearchHits()) {
System.out.println(searchHit);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Spring Boot框架集成
# Spring Boot介绍
Spring 是一个开源框架,Spring 是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》。Spring 是为了解决企业级应用开发的复杂性而创建的,使用 Spring 可以让简单的 JavaBean 实现之前只有 EJB 才能完成的事情。但是 Spring 不仅仅局限于服务器端开发,任何 Java 应用都能在简单性、可测试性和松耦合性等方面从 Spring 中获益。
Spring Boot 是所有基于 Spring 开发的项目的起点。Spring Boot 的设计是为了让你尽可能快的跑起来 Spring 应用程序并且尽可能减少你的配置文件。简单来说就是 Spring Boot 其实不是什么新的框架,它默认配置了很多框架的使用方式,就像 Maven 整合了大部分的 jar 包,Spring Boot 整合了大部分的框架
# SpringBoot的特点
为基于 Spring 的开发提供更快的入门体验
开箱即用,没有代码生成,也无需 XML 配置。同时也可以修改默认值来满足特定的需求
提供了一些大型项目中常见的非功能性特性,如嵌入式服务器、安全、指标,健康检测、外部配置等
SpringBoot 不是对 Spring 功能上的增强,而是提供了一种快速使用 Spring 的方式
# Data框架和Boot框架集成区别
其实 Spring Data 框架集成 中,已经使用了 Spring Boot,本内容的框架集成指的是单纯将 Spring Boot和 ElasticSearch 整合,没有用到 Spring Data 框架的 Repository 以及 ElasticsearchOperations 接口。
两个框架集成的 区别 就在于 Spring Data 对于 ES 的操作封装更加彻底,实体类可以写注解进行映射和配置索引名等,并且支持 Spring Boot。而 Spring Boot 框架集成则是将 Spring Boot 框架和 ES 的 Java API 进行集成。不懂 Java API 的可以去看 ElasticSearch - Java操作
两个框架集成的 选择 建议选择 Spring Data 框架集成,它封装的更加彻底,让你几行代码就能执行 ES 的大部分操作,当然,因为操作过于简单,它牺牲了步骤逻辑。如果想了解步骤逻辑,可以使用 Spring Boot 框架集成,虽然使用繁琐,代码步骤重复,但是操作起来和 Kibana 、 Postman 的操作逻辑更加吻合。
总结:Spring Data 框架集成操作简单,快速开发,牺牲步骤逻辑,可使用 Spring Boot 框架集成进行辅助开发,类似于 Mybatis
;Spring Boot 框架集成操作繁琐,逻辑步骤重复,类似于原生 JDBC
。
# 框架搭建
新建一个 Maven 项目
修改 pom 文件,增加依赖
<dependencies> <!--ES 客户端--> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.15.2</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.15.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.5.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <version>2.5.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.3</version> </dependency> </dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36ES 客户端的依赖版本要和 ES 服务器版本一致
添加配置文件,在 resources 目录中增加 application.yml 文件
spring: application: name: es-springboot elasticsearch: host-list: 127.0.0.1:9200 # 多个节点用逗号分隔
1
2
3
4
5ES 配置类,进行地址和端口的配置
package com.kbt.config; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author Young Kbt * @date 2021/11/17 17:33 * @description ElasticSearch 配置类 */ @Configuration @ConfigurationProperties(prefix = "elasticsearch") public class ElasticSearchConfig { @Value("${elasticsearch.host-list}") private String hostList; // 关闭数据源 @Bean(destroyMethod = "close") // 遍历获取 application.yml 文件的地址端口,逗号隔开 public RestHighLevelClient restHighLevelClient(){ String[] split = hostList.split(","); HttpHost[] httpHostsArray = new HttpHost[split.length]; for (int i = 0; i < split.length; i++) { String item = split[i]; httpHostsArray[i] = new HttpHost(item.split(":")[0],Integer.parseInt(item.split(":")[1]),"http"); } return new RestHighLevelClient(RestClient.builder(httpHostsArray)); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
两个框架搭建区别
增加 application.yml 文件和添加 ES 配置类完全可以按照 Spring Data 框架集成的进行搭建,它的 ES 地址和端口在配置类进行配置,不涉及 application.yml 文件。
这里主要演示在 application.yml 文件添加 ES 地址和端口,然后在 ES 配置类进行遍历获取。
两个搭建根据需求自行选择。
2021-11-18 @Young Kbt
添加实体类,没有集成 Spring Data 框架,所以无法使用注解进行映射设置,只能在发送请求给 ES 时,进行映射设置
public class User { /** * 用户名 */ private String username; /** * 年龄 */ private int age; /** * 性别 */ private String sex; public User() { } public User(String username, int age, String sex) { this.username = username; this.age = age; this.sex = sex; } // set // get // toString }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27请加入 Set 和 Get 和 toString 方法,toString 方法是为了测试期间直接打印在控制台。
# 连接测试
首先判断是否成功连接了 ES 服务器
/**
* @author Young Kbt
* @date 2021/11/18 12:41
* @description 是否连接 ES 客户端测试类
*/
@SpringBootTest
public class ESClientTest {
@Autowired
private RestHighLevelClient client;
@Test
public void testESClient() {
System.out.println(client);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
如果返回 RestHighLevelClient
的地址,则说明成功,如:
org.elasticsearch.client.RestHighLevelClient@7a904f32
# 索引测试
索引的 CRD 代码如下:
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 创建索引
*/
@Test
public void createIndex() throws IOException {
// 创建索引的请求对象
CreateIndexRequest request = new CreateIndexRequest("user");
CreateIndexResponse response = esClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = response.isAcknowledged();
System.out.println("创建索引状态:" + acknowledged);
}
/**
*查看索引
*/
@Test
public void queryIndex() throws IOException {
// 创建索引的请求对象
GetIndexRequest request = new GetIndexRequest("user");
GetIndexResponse response = esClient.indices().get(request, RequestOptions.DEFAULT);
// 打印响应结果
System.out.println("索引别名:" + response.getAliases());
System.out.println("索引映射:" + response.getMappings());
System.out.println("索引配置:" + response.getSettings());
}
/**
* 删除索引
*/
@Test
public void deleteIndex() throws IOException {
// 创建索引的请求对象
DeleteIndexRequest request = new DeleteIndexRequest("user");
AcknowledgedResponse response = esClient.indices().delete(request, RequestOptions.DEFAULT);
System.out.println("删除状态:" + response.isAcknowledged());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 创建索引
*/
@Test
public void createIndex() throws IOException {
// 创建索引的请求对象
CreateIndexRequest request = new CreateIndexRequest("user");
CreateIndexResponse response = esClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = response.isAcknowledged();
System.out.println("创建索引状态:" + acknowledged);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
*查看索引
*/
@Test
public void queryIndex() throws IOException {
// 创建索引的请求对象
GetIndexRequest request = new GetIndexRequest("user");
GetIndexResponse response = esClient.indices().get(request, RequestOptions.DEFAULT);
// 打印响应结果
System.out.println("索引别名:" + response.getAliases());
System.out.println("索引映射:" + response.getMappings());
System.out.println("索引配置:" + response.getSettings());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 删除索引
*/
@Test
public void deleteIndex() throws IOException {
// 创建索引的请求对象
DeleteIndexRequest request = new DeleteIndexRequest("user");
AcknowledgedResponse response = esClient.indices().delete(request, RequestOptions.DEFAULT);
System.out.println("删除状态:" + response.isAcknowledged());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Make sure to add code blocks to your code group
# 文档测试
关于文档的 CURD 代码如下:
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 插入文档数据
*/
@Test
public void createDoc() throws IOException {
IndexRequest request = new IndexRequest();
request.id("1");
User user = new User();
user.setUsername("可乐");
user.setAge(18);
user.setSex("男");
ObjectMapper objectMapper = new ObjectMapper();
String userJsonStr = objectMapper.writeValueAsString(user);
// 添加文档数据,数据格式为 Json 格式
request.source(userJsonStr, XContentType.JSON);
// 客户端发送请求,获取响应对象
IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);
System.out.println("创建的索引:" + response.getIndex());
System.out.println("创建的 id:" + response.getId());
System.out.println("创建的结果:" + response.getResult());
}
/**
* 删除文档数据
*/
@Test
public void deleteDoc() throws IOException {
DeleteRequest request = new DeleteRequest();
// 配置修改参数
request.index("user").id("1001");
// 客户端发送请求,获取响应对象
DeleteResponse response = esClient.delete(request, RequestOptions.DEFAULT);
System.out.println("响应内容:" + response.toString());
}
/**
* 批量删除文档数据
*/
@Test
public void batchDeleteDoc() throws IOException {
BulkRequest request = new BulkRequest();
// 配置修改参数
request.add(new DeleteRequest().index("user").id("1001"));
request.add(new DeleteRequest().index("user").id("1002"));
request.add(new DeleteRequest().index("user").id("1003"));
request.add(new DeleteRequest().index("user").id("1004"));
request.add(new DeleteRequest().index("user").id("1005"));
// 客户端发送请求,获取响应对象
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
//打印结果信息
System.out.println("响应时间:" + response.getTook());
}
/**
* 查询文档数据
*/
@Test
public void queryDoc() throws IOException {
// 创建请求对象
GetRequest request = new GetRequest();
request.index("user").id("1001");
// 客户端发送请求,获取响应对象
GetResponse response = esClient.get(request, RequestOptions.DEFAULT);
System.out.println("创建的索引:" + response.getIndex());
System.out.println("创建的 id:" + response.getId());
System.out.println("查询的结果:" + response.getSourceAsString());
}
/**
* 修改文档数据
*/
@Test
public void upadateDoc() throws IOException {
UpdateRequest request = new UpdateRequest();
// 配置修改参数
request.index("user").id("1001");
// 设置请求体,对数据进行修改
request.doc(XContentType.JSON, "sex", "男");
// 客户端发送请求,获取响应对象
UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
System.out.println("创建的索引:" + response.getIndex());
System.out.println("创建的 id:" + response.getId());
System.out.println("修改的结果:" + response.getResult());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 插入文档数据
*/
@Test
public void createDoc() throws IOException {
IndexRequest request = new IndexRequest();
request.id("1");
User user = new User();
user.setUsername("可乐");
user.setAge(18);
user.setSex("男");
ObjectMapper objectMapper = new ObjectMapper();
String userJsonStr = objectMapper.writeValueAsString(user);
// 添加文档数据,数据格式为 Json 格式
request.source(userJsonStr, XContentType.JSON);
// 客户端发送请求,获取响应对象
IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);
System.out.println("创建的索引:" + response.getIndex());
System.out.println("创建的 id:" + response.getId());
System.out.println("创建的结果:" + response.getResult());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 批量添加文档数据
*/
@Test
public void batchCreateDoc() throws IOException {
BulkRequest request = new BulkRequest();
ObjectMapper objectMapper = new ObjectMapper();
// 创建数据对象
request.add(new IndexRequest().index("user").id("1001").source(objectMapper.writeValueAsString(new User("可乐", 18, "男")), XContentType.JSON));
request.add(new IndexRequest().index("user").id("1002").source(objectMapper.writeValueAsString(new User("冰糖", 20, "女")), XContentType.JSON));
request.add(new IndexRequest().index("user").id("1003").source(objectMapper.writeValueAsString(new User("雪梨", 22, "女")), XContentType.JSON));
request.add(new IndexRequest().index("user").id("1004").source(objectMapper.writeValueAsString(new User("酸橙", 24, "男")), XContentType.JSON));
request.add(new IndexRequest().index("user").id("1005").source(objectMapper.writeValueAsString(new User("蜜桃", 30, "女")), XContentType.JSON));
// 客户端发送请求,获取响应对象
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
System.out.println("响应时间:" + response.getTook());
System.out.println("创建的内容:" + Arrays.toString(response.getItems()));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 删除文档数据
*/
@Test
public void deleteDoc() throws IOException {
DeleteRequest request = new DeleteRequest();
// 配置修改参数
request.index("user").id("1001");
// 客户端发送请求,获取响应对象
DeleteResponse response = esClient.delete(request, RequestOptions.DEFAULT);
System.out.println("响应内容:" + response.toString());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 批量删除文档数据
*/
@Test
public void batchDeleteDoc() throws IOException {
BulkRequest request = new BulkRequest();
// 配置修改参数
request.add(new DeleteRequest().index("user").id("1001"));
request.add(new DeleteRequest().index("user").id("1002"));
request.add(new DeleteRequest().index("user").id("1003"));
request.add(new DeleteRequest().index("user").id("1004"));
request.add(new DeleteRequest().index("user").id("1005"));
// 客户端发送请求,获取响应对象
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
//打印结果信息
System.out.println("响应时间:" + response.getTook());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 查询文档数据
*/
@Test
public void queryDoc() throws IOException {
// 创建请求对象
GetRequest request = new GetRequest();
request.index("user").id("1001");
// 客户端发送请求,获取响应对象
GetResponse response = esClient.get(request, RequestOptions.DEFAULT);
System.out.println("创建的索引:" + response.getIndex());
System.out.println("创建的 id:" + response.getId());
System.out.println("查询的结果:" + response.getSourceAsString());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 修改文档数据
*/
@Test
public void upadateDoc() throws IOException {
UpdateRequest request = new UpdateRequest();
// 配置修改参数
request.index("user").id("1001");
// 设置请求体,对数据进行修改
request.doc(XContentType.JSON, "sex", "男");
// 客户端发送请求,获取响应对象
UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
System.out.println("创建的索引:" + response.getIndex());
System.out.println("创建的 id:" + response.getId());
System.out.println("修改的结果:" + response.getResult());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Make sure to add code blocks to your code group
# 高级查询测试
高级查询代码如下:
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* term 查询
*/
@Test
public void termQuery() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termsQuery("age", "18","20","22"));
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* 查询所有数据
*/
@Test
public void queryAll() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 查询所有数据
sourceBuilder.query(QueryBuilders.matchAllQuery());
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* 范围查询
*/
@Test
public void rangeQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// 大于等于
rangeQuery.gte("18");
// 小于等于
rangeQuery.lte("24");
sourceBuilder.query(rangeQuery);
request.source(sourceBuilder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* 分页查询
*/
@Test
public void pageQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
// 分页查询
// 当前页其实索引(第一条数据的顺序号),from
sourceBuilder.from(0);
// 每页显示多少条 size
sourceBuilder.size(2);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* 过滤查询
*/
@Test
public void filterQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
//查询字段过滤
String[] excludes = {};
String[] includes = {"username", "age"};
sourceBuilder.fetchSource(includes, excludes);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* Bool 查询
*/
@Test
public void boolQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 必须包含
boolQueryBuilder.must(QueryBuilders.matchQuery("age", "30"));
// 一定不含
boolQueryBuilder.mustNot(QueryBuilders.matchQuery("username", "可乐"));
// 可能包含
boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男"));
sourceBuilder.query(boolQueryBuilder);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* 模糊查询
*/
@Test
public void fuzzyQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.fuzzyQuery("age",2).fuzziness(Fuzziness.ONE));
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* 高亮查询
*/
@Test
public void heightQuery() throws IOException {
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
// 2.创建查询请求体构建器
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 构建查询方式:高亮查询
sourceBuilder.query(QueryBuilders.termsQuery("age","18","20"));
// 构建高亮字段
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font color='red'>");// 设置标签前缀
highlightBuilder.postTags("</font>");// 设置标签后缀
highlightBuilder.field("age");// 设置高亮字段
//设置高亮构建对象
sourceBuilder.highlighter(highlightBuilder);
// 3.客户端发送请求,获取响应对象
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
//4.打印响应结果
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
System.out.println(sourceAsString);
// 打印高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
System.out.println(highlightFields);
}
}
/**
* 排序查询
*/
@Test
public void sortQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
// 排序
sourceBuilder.sort("age", SortOrder.ASC);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
/**
* 聚合查询
* 最大年龄
* 高亮查询
*/
@Test
public void arrgQuery1() throws IOException {
// 最大年龄
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.max("maxAge").field("age"));
// 3.客户端发送请求,获取响应对象
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 4.打印响应结果
System.out.println(response);
}
/**
* 聚合查询
* 分组统计
* 高亮查询
*/
@Test
public void arrgQuery2() throws IOException {
// 分组统计
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.terms("ageGroupby").field("age"));
// 3.客户端发送请求,获取响应对象
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 4.打印响应结果
SearchHits hits = response.getHits();
System.out.println(response);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* term 查询
*/
@Test
public void termQuery() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termsQuery("age", "18","20","22"));
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 查询所有数据
*/
@Test
public void queryAll() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 查询所有数据
sourceBuilder.query(QueryBuilders.matchAllQuery());
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 范围查询
*/
@Test
public void rangeQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// 大于等于
rangeQuery.gte("18");
// 小于等于
rangeQuery.lte("24");
sourceBuilder.query(rangeQuery);
request.source(sourceBuilder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 分页查询
*/
@Test
public void pageQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
// 分页查询
// 当前页其实索引(第一条数据的顺序号),from
sourceBuilder.from(0);
// 每页显示多少条 size
sourceBuilder.size(2);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 过滤查询
*/
@Test
public void filterQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
//查询字段过滤
String[] excludes = {};
String[] includes = {"username", "age"};
sourceBuilder.fetchSource(includes, excludes);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* Bool 查询
*/
@Test
public void boolQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 必须包含
boolQueryBuilder.must(QueryBuilders.matchQuery("age", "30"));
// 一定不含
boolQueryBuilder.mustNot(QueryBuilders.matchQuery("username", "可乐"));
// 可能包含
boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男"));
sourceBuilder.query(boolQueryBuilder);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 模糊查询
*/
@Test
public void fuzzyQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.fuzzyQuery("age",2).fuzziness(Fuzziness.ONE));
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 高亮查询
*/
@Test
public void heightQuery() throws IOException {
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
// 2.创建查询请求体构建器
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 构建查询方式:高亮查询
sourceBuilder.query(QueryBuilders.termsQuery("age","18","20"));
// 构建高亮字段
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font color='red'>");// 设置标签前缀
highlightBuilder.postTags("</font>");// 设置标签后缀
highlightBuilder.field("age");// 设置高亮字段
//设置高亮构建对象
sourceBuilder.highlighter(highlightBuilder);
// 3.客户端发送请求,获取响应对象
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
//4.打印响应结果
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
System.out.println(sourceAsString);
// 打印高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
System.out.println(highlightFields);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 排序查询
*/
@Test
public void sortQuery() throws IOException {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
// 排序
sourceBuilder.sort("age", SortOrder.ASC);
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("响应时间:" + response.getTook());
System.out.println("是否超时:" + response.isTimedOut());
System.out.println("命中数量:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("详细数据:");
for (SearchHit hit : hits) {
// 输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SpringBootTest
public class ESIndexTest {
@Autowired
private RestHighLevelClient esClient;
/**
* 聚合查询
* 最大年龄
* 高亮查询
*/
@Test
public void arrgQuery1() throws IOException {
// 最大年龄
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.max("maxAge").field("age"));
// 3.客户端发送请求,获取响应对象
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 4.打印响应结果
System.out.println(response);
}
/**
* 聚合查询
* 分组统计
* 高亮查询
*/
@Test
public void arrgQuery2() throws IOException {
// 分组统计
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.terms("ageGroupby").field("age"));
// 3.客户端发送请求,获取响应对象
SearchResponse response = esClient.search(request.source(sourceBuilder), RequestOptions.DEFAULT);
// 4.打印响应结果
SearchHits hits = response.getHits();
System.out.println(response);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Make sure to add code blocks to your code group
# Spark Streaming框架集成
# Spark Streaming框架介绍
Spark Streaming 是 Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。数据可以从许多来源获取,如 Kafka,Flume,Kinesis 或 TCP sockets,并且可以使用复杂的算法进行处理,这些算法使用诸如 map,reduce,join 和 window 等高级函数表示。最后,处理后的数据可以推送到文件系统,数据库等。实际上,您可以将 Spark 的机器学习和图形处理算法应用于数据流。
# 框架搭建
创建一个 Maven 项目
修改 pom 文件,增加依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.15.2</version> </dependency> <!-- elasticsearch 的客户端 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.15.2</version> </dependency> <!-- elasticsearch 依赖 2.x 的 log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- <dependency>--> <!-- <groupId>com.fasterxml.jackson.core</groupId>--> <!-- <artifactId>jackson-databind</artifactId>--> <!-- <version>2.11.1</version>--> <!-- </dependency>--> <!-- <!– junit 单元测试 –>--> <!-- <dependency>--> <!-- <groupId>junit</groupId>--> <!-- <artifactId>junit</artifactId>--> <!-- <version>4.12</version>--> <!-- </dependency>--> </dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 功能实现
import org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.indices.CreateIndexRequest
import org.elasticsearch.client.{RequestOptions, RestClient,
RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
import java.util.Date
object SparkStreamingESTest {
def main(args: Array[String]): Unit = {
val sparkConf = new
SparkConf().setMaster("local[*]").setAppName("ESTest")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
ds.foreachRDD(
rdd => {
println("*************** " + new Date())
rdd.foreach(
data => {
val client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 新增文档 - 请求对象
val request = new IndexRequest();
// 设置索引及唯一性标识
val ss = data.split(" ")
println("ss = " + ss.mkString(","))
request.index("sparkstreaming").id(ss(0));
val productJson = s""" | { "data":"${ss(1)}" } |""".stripMargin;
// 添加文档数据,数据格式为 JSON 格式
request.source(productJson,XContentType.JSON);
// 客户端发送请求,获取响应对象
val response = client.index(request, RequestOptions.DEFAULT);
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());
client.close()
}
)
}
)
ssc.start()
ssc.awaitTermination()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Flink框架集成
# Flink框架介绍
Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没有完全解决,而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而凸显的更加明显:
- 数据精准一次性处理(Exactly-Once)
- 乱序数据,迟到数据
- 低延迟,高吞吐,准确性
- 容错性
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在 Spark 火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。
慢慢地,随着这些问题的解决,Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公司在 2015 年改进 Flink,并创建了内部分支 Blink,目前服务于阿里集团内部搜索、推荐、广告和蚂蚁等大量核心实时业务。
# 框架搭建
创建一个 Maven 项目
修改 pom 文件,增加相关依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>1.12.0</version> </dependency> <!-- jackson --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.11.1</version> </dependency> </dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 功能实现
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunctio
n;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import
org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlinkElasticsearchSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost",
9999);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
//httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
//.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after
every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1);
// provide a RestClientFactory for custom configuration on the internally created REST client
// esSinkBuilder.setRestClientFactory(
// restClientBuilder -> {
// restClientBuilder.setDefaultHeaders(...)
// restClientBuilder.setMaxRetryTimeoutMillis(...)
// restClientBuilder.setPathPrefix(...)
// restClientBuilder.setHttpClientConfigCallback(...)
// }
// );
source.addSink(esSinkBuilder.build());
env.execute("flink-es");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57