目录
数据聚合
聚合的分类
编辑 DSL实现Bucket聚合
编辑
DSL实现Metrics聚合编辑
RestAPI实现聚合
对接前端接口编辑
自定义分词器编辑
Completion suggester查询
Completion suggester查询
酒店数据自动补全
实现酒店搜索框界面输入框的自动补全
数据同步问题分析编辑
同步问题分析
数据同步问题分析
数据问题分析
利用 MQ实现mysql与elasticearch数据同步
ES集群机构
ES集群的节点角色
ES集群的脑裂
ES集群的分布式存储
ES集群的分布式存储
ES集群的分布式查询
ES集群的故障转移
数据聚合
聚合的分类
DSL实现Bucket聚合
#聚合功能
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
#聚合功能,自定义排序规则
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"_count": "asc"
}
}
}
}
}
#聚合功能,限定聚合范围
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
DSL实现Metrics聚合
嵌套聚合metric
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"scoreAgg.avg": "desc"
}
},
"aggs": {
"scoreAgg": {
"stats": {
"field": "score"
}
}
}
}
}
}
RestAPI实现聚合
@Override
public Map<String, List<String>> filters() {
try {
//1.准备Request
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL
//2.1设置size
request.source().size(0);
//2.2 聚合
buildAggregation(request);
//3.发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析结果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
//TODO 根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations,"brandAgg");
result.put("brand",brandList);
//TODO 根据品牌名称,获取品牌结果
List<String> cityList = getAggByName(aggregations,"cityAgg");
result.put("city",cityList);
//TODO 根据品牌名称,获取品牌结果
List<String> starList = getAggByName(aggregations,"starAgg");
result.put("starName",starList);
return result;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private List<String> getAggByName(Aggregations aggregations,String aggName) {
//4.1根据聚合名称获取聚合结果
Terms brandTerms = aggregations.get(aggName);
//4.2获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//4.3遍历
List<String> brandList = new ArrayList<>();
for ( Terms.Bucket bucket : buckets ) {
String key = bucket.getKeyAsString();
// System.out.println(key);
brandList.add(key);
}
return brandList;
}
对接前端接口
POST /_analyze
{
"text": ["如家酒店还不错"],
"analyzer": "pinyin"
}
自定义分词器
POST /test2/_analyze
{
"text": ["如家酒店还不错"],
"analyzer": "my_analyzer"
}
// 自定义拼音分词器
PUT /test2
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "my_analyzer"
}
}
}
}
Completion suggester查询
Completion suggester查询
// 自动补全的索引库
PUT test
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
// 示例数据
POST test/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test/_doc
{
"title": ["Nintendo", "switch"]
}
// 自动补全查询
POST /test/_search
{
"suggest": {
"title_suggest": {
"text": "s", // 关键字
"completion": {
"field": "title", // 补全字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
酒店数据自动补全
#酒店数据索引库
GET /hotel/_mapping
DELETE /hotel
#酒店数据索引库
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
GET /hotel/_search
{
"query": {
"match_all": {}
}
}
GET /hotel/_search
{
"suggest": {
"suggestions": {
"text":"sd",
"completion":{
"field":"suggestion",
"skip_duplicates": true,
"size": 10
}
}
}
}
RestAPI实现自动补全
@Test
void testSuggest() throws IOException {
//1、准备Request
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix("hz")
.skipDuplicates(true)
.size(10)
));
//3、发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4、解析结果
Suggest suggest = response.getSuggest();
//4.1根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
///4.2获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
//4.3遍历
for ( CompletionSuggestion.Entry.Option option : options ) {
String text = option.getText().toString();
System.out.println("text = " + text);
}
}
实现酒店搜索框界面输入框的自动补全
数据同步问题分析
同步问题分析
数据同步问题分析
数据问题分析
利用 MQ实现mysql与elasticearch数据同步
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听酒店删除业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(
MqConstants.HOTEL_EXCHANGE,
MqConstants.HOTEL_INSERT_KEY,
hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(
MqConstants.HOTEL_EXCHANGE,
MqConstants.HOTEL_DELETE_KEY,
id);
}
ES集群机构
ES集群的节点角色
ES集群的分布式查询