当前位置: 首页>数据库>正文

es宽表构建 es 宽表

背景

oracle迁移到MySQL,单表数据量巨大(500w),导致查询sql巨慢,浏览器经常出现卡死现象。

索引优化

对目前的单表进行索引优化,无奈,sql查询条件无比复杂,导致优化空间有限。

加工宽表

将上述的查询结果加工成宽表, 当数据变化时,通过cannal 监听数据,并同步修改宽表,仍然有如下问题:

  • 单表查询条件无法,无法找到合适的索引配置。
  • 经常会有大批量数据改造,导致数据同步缓慢甚至卡死现象。

分库分表

经过跟客户的长时间沟通,客户同意根据某一个字段地市进行数据分片,奈何该省数据分布不均匀,根据该字段进行分表时,仍有地市数据超过200w.
尝试跟客户沟通,增加数据分片键,未果!

最终,使用es-索引外置来解决该问题。

ES索引外置

将所有的查询条件和主键,存储在ES中,在ES中进行查询定位主键id 以及分页相关信息,
然后再通过获取的主键id,到mysql查询数据。

创建索引

PUT 192.168.129.157:9200/tb_client_all_condition_1102

{
	"settings":{
        "index":{"number_of_shards":3, "number_of_replicas":2}
    },
    "mappings":{
       "tb1" : {
           "properties": {
               "client_id": {"type":"long"},
               "client_name":{"type":"keyword"},
               "industry_type":{"type":"keyword"},
               "scenes_type":{"type":"keyword"},
               "org_uscc":{"type":"keyword"},
               "area_code":{"type":"integer"},
               "county_code":{"type":"integer"},
               "register_date":{"type":"date",  "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
               "bd_location":{"type":"text"},
               "bd_location_geo":{"type":"geo_point"},
               "is_ctl_client":{"type":"keyword"},
               "status_cd":{"type":"keyword"},
               "regno":{"type":"keyword"},
               "licid":{"type":"keyword"},
               "crm_cert_num_7":{"type":"keyword"},
               "is_virt_cust":{"type":"keyword"},
               "province_id":{"type":"keyword"},
               "latn_code":{"type":"keyword"},
               "mkt_area_code":{"type":"keyword"},
               "dept_code":{"type":"keyword"},
               "group_code":{"type":"keyword"},
               "duty_zone_id":{"type":"keyword"},
               "duty_zone_code":{"type":"keyword"},
               "x_hx5_id":{"type":"keyword"},
               "region_code":{"type":"keyword"},
               "state":{"type":"keyword"},
               "cust_claim_falg":{"type":"keyword"},
               "cust_claim_opt_date":{"type":"date",  "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
               "cust_id":{"type":"keyword"}
           }
       }
   }
}

注意插入geo_point字段数据时,可以通过如下几种方式

### 纬度(lat) 经度(lon) 键值对 { "bd_location": { "lat" : 30.9269110, "lon" : 118.3402890 } } ### 字符串: "维度,经度" {"bd_location":"30.926911,118.340289"} ### 数组: [经度,纬度] {"bd_location":[118.340289,30.926911]}

数据导入(logstash)

安装logstash

安装(略)

logstash-input-jdbc插件

[logstash@s156 sync_xj_mysql]$ logstash-plugin install logstash-input-jdbc

配置jdbc.conf

input {
    stdin {
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://192.168.128.108:3306/gec_xj?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
      jdbc_user => "xj2020"
      jdbc_password => "xj2020"
      jdbc_driver_library => "mysql-connector-java-5.1.46.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "100"
      codec => plain { charset => "UTF-8"}
 
      #使用其它字段追踪,而不是用时间
      use_column_value => true
      #追踪的字段      
      tracking_column => client_id      
      record_last_run => true     
      #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值     
      last_run_metadata_path => "/home/logstash/sync_xj_mysql/station_parameter.txt"
 
      jdbc_default_timezone => "Asia/Shanghai"
      statement_filepath => "jdbc.sql"
      
      #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
 
      # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟
      schedule => "*/1 * * * *"
      type => "jdbc"
    }
}
 
filter {
        json {
                source => "message"
                remove_field => ["message"]
         }

         mutate {
                remove_field => ["@timestamp","type","@version"]
#               # 经纬度
                split => ["bd_location" ,","]
                # es geo字段为字符串类型时,是“维度,经度”
                add_field => ["bd_location_geo","%{bd_location[1]},%{bd_location[0]}"]
        }
}
 
output {
   elasticsearch {
        hosts => "192.168.129.157:9200"
        index => "tb_client_all_condition_1102"
        document_type => "tb1"
#        # 主键名称(类似数据库主键)
        document_id => "%{client_id}"
    }
}

配置jdbc.sql

select b.* from  tb_client_all_condition b where b.client_id > :sql_last_value

配置序列文件

> vim station_parameter.txt 
1

DSL

{
   "query" : {
      "bool" : {
         "filter" : [
             {"term" : { "latn_code" : "551"}},
             {"term" : { "county_code" : "340111"}},
             {"terms" : { "scenes_type" : ["01-010-005","01-010-007","01-010-008"]}},
             {"term" : { "mkt_area_code" : "300551009094" }},
             {"term" : {"dept_code" : "400551034923"}},
             {"term" : {"duty_zone_code" : "600551131802"}},
             {"term" : {"duty_zone_id" : "1383248"}},
             {"term" : {"region_code" : "510792539"}},
             {"term" : {"is_ctl_client" : "N"}},
         	 {"term" : {"industry_type" : "01-010"}},
           	 {"prefix" : {"client_name" : "颍上县"}},
           	 {"geo_distance": {"distance": "20km","bd_location_geo": "32.635626,116.264753"}},
           	 {"range":{"register_date":{"gt":"2020-01-01","lt":"2020-02-02"}}}
         ]
      }
   },
   "from" : 0,
   "size" : 5,
   "sort" : ["duty_zone_code"],
   "_source":{
   		"include":["client_id","*_code"],
   		"exclude":["mkt_area_code"]
   },
     "track_total_hits":true
}

Java API

@Test
    public void test06_multi_qry() {
        String latn_code="551";
        String county_code="340111";
        String scenes_type_s="01-010-005,01-010-007,01-010-008";

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (!StringUtil.isNullOrEmpty(latn_code)) {
            boolQueryBuilder.filter(QueryBuilders.termQuery("latn_code", latn_code));
        }
        if (!StringUtil.isNullOrEmpty(county_code)) {
            boolQueryBuilder.filter(QueryBuilders.termQuery("county_code", county_code));
        }

        if (!StringUtil.isNullOrEmpty(scenes_type_s)) {
            boolQueryBuilder.filter(QueryBuilders.termsQuery("scenes_type", scenes_type_s.split(",")));
        }

        String client_name="颍上县";
        if (!StringUtil.isNullOrEmpty(client_name)) {
            boolQueryBuilder.filter(QueryBuilders.prefixQuery("client_name",client_name));
        }

        //纬经度
        String bd_location_geo = "32.635626,116.264753";
        String distance = "10km";
        if (!StringUtil.isNullOrEmpty(bd_location_geo) && !StringUtil.isNullOrEmpty(distance)) {
            boolQueryBuilder.filter(QueryBuilders.geoDistanceQuery("bd_location_geo").point(new GeoPoint(bd_location_geo)).distance(distance));
        }

       /* String register_date_start = "2010-01-01";
        String register_date_end = "2019-12-31";
        if (!StringUtil.isNullOrEmpty(register_date_start) || !StringUtil.isNullOrEmpty(register_date_end)) {
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("register_date");
            if (!StringUtil.isNullOrEmpty(register_date_start)){
                rangeQueryBuilder.gte(register_date_start);
            }

            if (!StringUtil.isNullOrEmpty(register_date_end)){
                rangeQueryBuilder.lte(register_date_end);
            }

            boolQueryBuilder.filter(rangeQueryBuilder);
        }*/

        SortBuilder sortBuilder = SortBuilders.fieldSort("client_id").order(SortOrder.ASC);

//        String[] includes = {"client_id","*_code"};
//        String[] excludes = {"mkt_area_code"};
        SearchResponse result = client.prepareSearch(DEV_ES_INDEX)
                .setTypes(DEV_ES_TYPE)
                .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
                .setQuery(boolQueryBuilder)
                .setFrom(PAGE_SIZE * (PAGE_NUM > 0 ? PAGE_NUM -1 : 0))
                .setSize(PAGE_SIZE)
                .addSort(sortBuilder)
//                .setFetchSource(includes,excludes)
                .setFetchSource("client_id",null)
                .setTrackTotalHits(true)
                .get();


        SearchHits hits =result.getHits();
        long total = hits.getTotalHits();
        System.out.println("------总共有:"+total+"条记录,共"+ calPages(total)+"页,每页"+PAGE_SIZE+"条 "+",当前页【"+PAGE_NUM+"】,耗时"+result.getTook().toString());

        hits.forEach(searchHit -> {
            System.out.println(searchHit.getSourceAsMap());
        });
    }

    final int PAGE_NUM = 2;
    final int PAGE_SIZE = 5;
    private int calPages(long total) {
        return (int)Math.ceil(Double.valueOf(total) / Double.valueOf(PAGE_SIZE));
    }

执行结果:

es宽表构建 es 宽表,es宽表构建 es 宽表_mysql,第1张

HighLevelClient

@Test
    public void rest_multi_qry_2() throws IOException {
        String latn_code = "551";
        String county_code = "340111";
        String scenes_type_s = "01-010-005,01-010-007,01-010-008";

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (!StringUtil.isNullOrEmpty(latn_code)) {
            boolQueryBuilder.filter(QueryBuilders.termQuery("latn_code", latn_code));
        }
        if (!StringUtil.isNullOrEmpty(county_code)) {
            boolQueryBuilder.filter(QueryBuilders.termQuery("county_code", county_code));
        }

        if (!StringUtil.isNullOrEmpty(scenes_type_s)) {
            boolQueryBuilder.filter(QueryBuilders.termsQuery("scenes_type", scenes_type_s.split(",")));
        }

        String client_name = "颍上县";
        if (!StringUtil.isNullOrEmpty(client_name)) {
            boolQueryBuilder.filter(QueryBuilders.prefixQuery("client_name", client_name));
        }

        //纬经度
        String bd_location_geo = "32.635626,116.264753";
        String distance = "10km";
        if (!StringUtil.isNullOrEmpty(bd_location_geo) && !StringUtil.isNullOrEmpty(distance)) {
            boolQueryBuilder.filter(QueryBuilders.geoDistanceQuery("bd_location_geo").point(new GeoPoint(bd_location_geo)).distance(distance));
        }

       /* String register_date_start = "2010-01-01";
        String register_date_end = "2019-12-31";
        if (!StringUtil.isNullOrEmpty(register_date_start) || !StringUtil.isNullOrEmpty(register_date_end)) {
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("register_date");
            if (!StringUtil.isNullOrEmpty(register_date_start)){
                rangeQueryBuilder.gte(register_date_start);
            }

            if (!StringUtil.isNullOrEmpty(register_date_end)){
                rangeQueryBuilder.lte(register_date_end);
            }

            boolQueryBuilder.filter(rangeQueryBuilder);
        }*/

        //数据权限


        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.fetchSource(new String[]{"client_id"}, null)
                .from(PAGE_SIZE * (PAGE_NUM > 0 ? PAGE_NUM - 1 : 0))
                .size(PAGE_SIZE)
                .sort(new FieldSortBuilder("client_id").order(SortOrder.ASC))
                .trackTotalHits(true)
                .query(boolQueryBuilder)
                .timeout(new TimeValue(60, TimeUnit.SECONDS));
        System.out.println(sourceBuilder);

        SearchRequest searchRequest = new SearchRequest(DEV_ES_INDEX).types(DEV_ES_TYPE).source(sourceBuilder);
        SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);

        SearchHits hits = result.getHits();
        long total = hits.getTotalHits();
        System.out.println("------总共有:" + total + "条记录,共" + calPages(total) + "页,每页" + PAGE_SIZE + "条 " + ",当前页【" + PAGE_NUM + "】,耗时" + result.getTook().toString());

        hits.forEach(searchHit -> {
            System.out.println(searchHit.getSourceAsMap());
        });
    }



https://www.xamrdz.com/database/6ug1957459.html

相关文章: