在这个教学里,我们会使用到以下关键技术栈:
- Spring Boot
- Elasticsearch
- Logstash
- MySQL
Spring Boot 主要作为我们对外的 API 接口,Elasticsearch 则是作为我们的搜索引擎。假设我们本来已经在使用 MySQL 作为我们的数据库,这篇教程会教你如何利用 Logstash 实现 MySQL 与 Elasticsearch 的实时同步。
Spring Boot 设置
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>6.3.1</version>
</dependency>
我们需要通过 jest (Java Rest Client) 向 elasticsearch 9200 端口发送 HTTP Rest 请求来进行查询。
application.properties 设置
spring.elasticsearch.jest.uris=http://localhost:9200
spring.elasticsearch.jest.read-timeout=5000
EsStudyCard.java
@Document(indexName = "cards", type = "card")
public class EsStudyCard {
@Id
private int id;
private String title;
private String description;
// setters and getters
}
这里需要注意的是,我们不能对同一个实体类 (entity class) 同时使用注解 @Entity 与 @Document,因为 @Entity 表示由 JPA 掌管这个实体类,而 @Document 表示由 Elasticsearch 掌管这个实体类,两个一起使用的话会产生冲突并导致异常。因此我们需要创建另一个实体类专门给 Elasticsearch 使用。
如果是使用 Docker 容器或外部 Elasticsearch 服务器,请将 localhost 改成相应的主机名或主机地址。
EsStudyCardService.java
@Service
public class EsStudyCardService {
@Autowired
private JestClient jestClient;
public List<EsStudyCard> search(String content) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders
.boolQuery()
.should(QueryBuilders.matchQuery("title", content).fuzziness("10").operator(Operator.OR))
.should(QueryBuilders.matchQuery("description", content).fuzziness("10").operator(Operator.OR))
.minimumShouldMatch(1)
);
Search search = new Search
.Builder(searchSourceBuilder.toString())
.addIndex("cards")
.addType("card")
.build();
try {
JestResult result = jestClient.execute(search);
return result.getSourceAsObjectList(EsStudyCard.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
这是模糊搜索的一个写法。首先我们创建一个 SearchSourceBuilder 的实例,接着通过调用 query() 方法并传递一个 QueryBuilder 实例 (通过 QueryBuilders 接口创建)。在这里我们的 QueryBuilder 会查询任何满足模糊搜索条件的数据。这里我们使用 should() 方法并加上 minimumShouldMatch(1) 方法来表示满足 title or description fields 的逻辑。
在 should() 方法里传递 QueryBuilder,在这里我们使用 fuziness() 方法表示我们想要使用模糊搜索,传递的参数表示模糊搜索词的 edit distance。接着我们调用 operator() 方法并传递 Operator.OR 表示如果我们这里的 content 包含了多个词,则会对应每个词进行模糊搜索并用 OR 的逻辑关系将他们串联起来。举例,如果我们的搜索词是 “red apple”,则搜索条件为满足 “red” 模糊搜索 OR 满足 “apple” 模糊搜索。
接着我们通过 Search 来指定 index 与 type。然后使用 JestClient 发送查询请求。在这里我们只需要确保我们加入了 JestClient 的依赖 (如之前所示),然后使用 @Autowired 注解,Spring Boot 就会帮我们注入 JestClient 的实例了。
EsStudyCardController.java
@PostMapping(value = "/esstudycard/search")
public ResponseEntity<Object> search(@RequestBody JSONObject payload) {
String content = (String) payload.get("content");
List<EsStudyCard> studyCards = esStudyCardService.search(content);
JSONObject jsonObject = new JSONObject();
jsonObject.put("data", studyCards);
return ResponseEntity.ok().body(jsonObject);
}
接着在 controller 类里面只需要简单调用 EsStudyCardService 的 search 方法就可以了。
Logstash 配置文件
以 MySQL 为例,我们需要下载相应的 jdbc 驱动,并放在指定的目录下。
接着我们创建一个如下的 Logstash 配置文件。这里以 Docker 容器为例,通过 volume 的形式将该 Logstash 配置文件映射到 /usr/share/logstash/pipeline (CentOS 系统). Logstash 会自动检测 pipeline 文件下的配置文件。
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/vanpanda"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
statement => "SELECT * FROM study_cards where modified_timestamp > :sql_last_value order by modified_timestamp;"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "modified_timestamp"
schedule => "* * * * *"
jdbc_default_timezone => "America/Los_Angeles"
sequel_opts => {
fractional_seconds => true
}
}
}
output {
elasticsearch {
index => "cards"
document_type => "card"
document_id => "%{id}"
hosts => ["localhost:9200"]
}
}
该配置文件主要是告诉 Logstash 数据源的位置。这里我们的数据源就是我们的 MySQL 数据库。根据该配置文件,Logstash 每分钟会发送我们指定的查询语句(query),接着将返回的数据插入到 Elasticsearch 中,实现近似实时同步。
请将 localhost 改成相应的主机名或主机地址。
查看 Elasticsearch 的状态或数据
我们可以通过 web api 的方式来查看 Elasticsearch 的状态,包括它的index。
http://localhost:9200/{index_name}
http://localhost:9200/_cat/indices?v
http://localhost:9200/{index_name}/_search?pretty
将上述 url 直接复制到浏览器即可通过 Elasticsearch 9200 端口的 Http Rest api 进行查找。
让 Logstash 持续运行,并检查数据更新
当 Logstash 完成第一次任务之后会自动 shut down。我们需要 Logstash 持续在 Docker 的容器中运行并每隔一段时间就执行一次任务来确保 Elasticsearch 与 MySQL 可以达到几乎实时同步的效果。
在 Logstash 的配置文件里,在 input jdbc 里配置 schedule 设定,否则 Logstash 会认为这只是一次性任务,并在第一次执行之后就会自动关闭。
input {
jdbc {
...
schedule => "* * * * *" #加入这个,表示每分钟更新
}
}
例子中的语法代表每分钟执行一次任务。我们可以查看语法文档自定义时间。
确保 Logstash 不会重复更新旧数据
如果我们没有告诉 Logstash 如何判断数据的新旧,Logstash 每次执行任务时,便会把旧数据删除,再重新插入一次,导致资源浪费。
对 Logstash 做如下配置:
input {
jdbc {
...
# 需要利用 Modified Date / Timestamp 来判断数据新旧
statement => "SELECT" * FROM testtable where Date > :sql_last_value order by Date"
# 告诉 Logstash 使用 column value for
use_column_value => true :sql_last_value
# 指定 column Date 作为 :sql_last_value 的值
tracking_column => Date
# 如果 column type 是 timestamp,则一定要配置这项。默认为 numeric
tracking_column_type => timestamp
}
错误信息与解决办法
Unable to find driver class via URLClassLoader in given driver jars: com.mysql.jdbc.Driver and com.mysql.jdbc.Driver
logstash 版本 6.2.x 以及以上,conf 文件里不要设定 jdbc_driver_library,将 connector
driver 的 jar 文件直接放在 /usr/share/logstash/logstash-core/lib/jars/ 即可。
Logstash 运行之后,Elasticsearch 里只有一条数据,其他数据没有存进来。
主要原因是因为数据的id重复了,因此 Elasticsearch 会将旧数据删除,并插入新数据。 通过检查 index 的状态
(http://localhost:9200/_cat/indices?v),可以发现有数据被 deleted 了。
当我们在设置 Logstash 的 conf 文件时,在 output document_id 这一项,使用通配符 %{field_name} 来指定一个唯一的 id。比如说,我们从数据库 select 出来的 fields 里有一个作为 primary key 的 field 名为 id,我们则可以这样设置:%{id}。Logstash 就会以这个 field 里的值来作为 Elasticsearch 里的 id。
Logstash sql_last_value is always 0
如果 tracking_column 是 timestamp的话,记得设定
tracking_column_type => timestamp
Logstash sql_last_value timestamp is not consistent with database
一个可能的原因是因为 logstash :sql_last_value 的 timezone 与数据库的不一致。因为
Elasticsearch 跟 Logstash 默认使用 UTC timezone。
解决方法:在 Logstash 的配置文件里,设置 jdbc_default_timezone 为你想要的 timezone,则 Elasticsearch 与 Logstash 会据此进行转换。
MySQL Docker 容器第一次启动的时候出现 ERROR 1396 (HY000) at line 1: Operation CREATE USER failed for ‘root’@’%’ 然后就关掉了
可能是因为我们在 docker-compose.yml 里面传递参数给 MySQL 容器的时候,重复创建名为 root
的用户所导致的错误。尝试换个用户名试试看。或者如果要继续使用 root 用户来创建表的话,只需要在 docker-compose.yml 设置 MYSQL_ROOT_PASSWORD 就可以了。
Logstash :sql_last_value 的 timestamp 把秒之后的毫秒给 truncate 掉了,这时候如果数据库的 timestamp 保有毫米信息的话,则会发生重复选取数据的问题。
例如: 如果数据库的时间戳为 2019-11-20 09:28:03.042000,Logstash 的 sql_last_value 则会变成 2019-11-20 09:28:03。而我们的 query 是想选取 > :sql_last_value 的,由于数据库里的这条数据有毫秒信息,那么他永远都会比较大而导致重复选取数据。
这个问题已经在 logstash-jdbc-input 4.3.5 版本里修复了。如果我们用的是最新的 Logstash 版本,则我们容器自动安装的 jdbc-input 插件也应该是有这个修复的版本。
此时,我们还必须在 Logstash 配置文件为 jdbc 做如下配置才可以使这个修复生效。
sequel_opts => {
fractional_seconds => true
}
在 Spring Boot 里,当我们对同一个Entity同时使用 @Entity 与 @Document 时,出现以下错误:
The bean ‘studyCardPagedJpaRepository’, defined in null, could not be registered. A bean with that name has already been defined in null and overriding is disabled.
因为 @Entity 表示由 JPA 掌管这个实体类,而 @Document 表示由 Elasticsearch
掌管这个实体类,两个一起使用的话会产生冲突并导致异常。因此我们需要创建另一个实体类专门给 Elasticsearch 使用。