1. 如果 插入es的BodyJson大于32766(utf-8),需要使用mapping创建
【Elasticsearch系列之四】腾讯云ES数据基本操作使用Elasticsearch(附Golang代码)迁移到es使用上遇到的问题elasticsearch如何修改mapping和template的方法elasticsearch - 如何使用Golang将数据导入ElasticsearchElasticSearch的Mapping之字段类型「Elasticsearch」ES重建索引怎么才能做到数据无缝迁移呢?
- keyword类型的最大支持的长度为——32766个UTF-8类型的字符。
也就是说term精确匹配的最大支持的长度为32766个UTF-8个字符
解决方式1:
如果该字段不需要被索引,可禁止对该字段进行索引。“index”: false
解决方式2:
如果该字段需要被索引,可对字段长度做个限制。“ignore_above”: 20 表示所有该字段的内容长度超过20字节时,该条记录会被插入,但同样不会被索引。
修改mapping之前,要把数据都清理掉,不然你就算put了新的mapping,他以前的数据也更正不过来,后进去的数据会跟随旧数据的存储类型 - 注意:索引模板仅在索引创建时应用,因此模板调整不会对已有的索引产生影响。
- 深究|Elasticsearch单字段支持的最大字符数
- text类型:支持分词、全文检索,不支持聚合、排序操作。
适合大字段存储,如:文章详情、content字段等; - keyword类型:支持精确匹配,支持聚合、排序操作。
适合精准字段匹配,如:url、name、title等字段。
- 一般情况,text和keyword共存
ES5.X版本以后,keyword支持的最大长度为32766个UTF-8字符,text对字符长度没有限制。 - 设置ignore_above后,超过给定长度后的数据将不被索引,无法通过term精确匹配检索返回结果。
- kibana查看mapping
GET 索引/表名/_mapping
//properties对应mapping里的properties内容,要进行分词和高亮因此要设置分词器和开启term_vector。
const mapping = `
{
"mappings": {
"properties": {
"id": {
"type": "long"
},
"title": {
"type": "text"
},
"genres": {
"type": "keyword"
}
}
}
}`
ctx := context.Background()
client, err := elastic.NewClient(elastic.SetURL(servers...))
if err != nil {
panic(err)
}
exists, err := client.IndexExists(indexName).Do(ctx)
if err != nil {
panic(err)
}
if !exists {
_, err := client.CreateIndex(indexName).BodyString(mapping).Do(ctx)
if err != nil {
panic(err)
}
}
上面这部分创建了client,用IndexExists检查索引是否存在。如果索引不存在用CreateIndex创建索引,mapping内容用BodyString传入。
2. ES数据库重建索引——Reindex(数据迁移)与提速
ES数据库重建索引——Reindex(数据迁移)
应用背景:
1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。
2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,
所以这种情况下也可以考虑尝试使用Reindex。
Reindex:
ES提供了_reindex这个API。相对于我们重新导入数据肯定会快不少,实测速度大概是bulk导入数据的5-10倍。
数据迁移步骤:
1、创建新的索引(可以通过java程序也可以直接在head插件上创建)
注意:在创建索引的时候要把表结构也要创建好(也就是mapping)
2、复制数据
最简单、基本的方式:
1)代码请求:
POST _reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index"
}
}
2)利用命令:
curl _XPOST 'ES数据库请求地址:9200/_reindex' -d{"source":{"index":"old_index"},"dest":{"index":"new_index"}}
但如果新的index中有数据,并且可能发生冲突,那么可以设置version_type"version_type": "internal"或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:
POST _reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index",
"version_type": "internal"
}
}
数据迁移效率
问题发现:
常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求,但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢
数据量几十个G的场景下,elasticsearch reindex速度太慢,从旧索引导数据到新索引,当前最佳方案是什么?
原因分析:
reindex的核心做跨索引、跨集群的数据迁移。
慢的原因及优化思路无非包括:
1)批量大小值可能太小。需要结合堆内存、线程池调整大小;
2)reindex的底层是scroll实现,借助scroll并行优化方式,提升效率;
3)跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率。
可行方案:
1)提升批量写入大小值
默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。
POST _reindex
{
"source": {
"index": "source",
"size": 5000
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
批量大小设置的依据:
1、使用批量索引请求以获得最佳性能。
批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB。
注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档:
1)每个1kb的1000个文档是1mb。
2)每个100kb的1000个文档是100 MB。
这些是完全不同的体积大小。
2、逐步递增文档容量大小的方式调优。
1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。
2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。
要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。
2)借助scroll的sliced提升写入效率
Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。
sliced原理(from medcl)
1)用过Scroll接口吧,很慢?如果你数据量很大,用Scroll遍历数据那确实是接受不了,现在Scroll接口可以并发来进行数据遍历了。
2)每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,利用Scroll重建或者遍历要快很多倍。
slicing使用举例
slicing的设定分为两种方式:手动设置分片、自动设置分片。
手动设置分片参见官网。
自动设置分片如下:
POST _reindex?slices=5&refresh
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
slices大小设置注意事项:
1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。
效果
实践证明,比默认设置reindex速度能提升10倍+。
1. 详细步骤
1.为什么需要重建索引
举个例子,如果一个字段是text类型,如果想修改为Long类型,是不能直接修改的。
在重建的过程中,需要有别名的参与。
2.操作步骤
对当前的索引新建一个别名
新建一个新的索引,同步结构
同步数据
给新的索引见一个别名
删除老的索引的别名
删除老的索引
二:操作步骤详解
1.对当前的索引添加别名
POST /_aliases
{
"actions": [
{
"add": {
"index": "nba",
"alias": "nba_lastest"
}
}
]
}
2.新增⼀个索引(⽐如修改字段类型,jerseyNo改成keyword类型)
PUT /nba_20200202
{
"mappings": {
"properties": {
"age" : {
"type" : "integer"
},
"birthDay" : {
"type" : "date"
},
"birthDayStr" : {
"type" : "keyword"
},
"code" : {
"type" : "text"
},
"country" : {
"type" : "text"
},
"countryEn" : {
"type" : "text"
},
"displayAffiliation" : {
"type" : "text"
},
"displayName" : {
"type" : "text"
},
"displayNameEn" : {
"type" : "text"
},
"draft" : {
"type" : "long"
},
"heightValue" : {
"type" : "float"
},
"jerseyNo" : {
"type" : "keyword"
},
"playYear" : {
"type" : "long"
},
"playerId" : {
"type" : "keyword"
},
"position" : {
"type" : "text"
},
"schoolType" : {
"type" : "text"
},
"teamCity" : {
"type" : "text"
},
"teamCityEn" : {
"type" : "text"
},
"teamConference" : {
"type" : "keyword"
},
"teamConferenceEn" : {
"type" : "keyword"
},
"teamName" : {
"type" : "keyword"
},
"teamNameEn" : {
"type" : "keyword"
},
"weight" : {
"type" : "text"
}
}
}
}
3.给新的索引,同步数据
添加的参数,是异步执行的意思
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "nba"
},
"dest": {
"index": "nba_20200202"
}
}
效果:
1
2
3
{
"task" : "O9L9cZRfQwWqC2UVUaxMaQ:18594274"
}
4.替换别名
POST /_aliases
{
"actions": [
{
"add": {
"index": "nba_20200202",
"alias": "nba_lastest"
}
},
{
"remove": {
"index": "nba",
"alias": "nba_lastest"
}
}
]
}
5.删除旧的索引
其实,这里不删除也行,但是会占用一些资源
DELETE /nba
6.进行验证
GET /nba_lastest/_search
{
"query": {
"match_all": {}
}
}
3.keyword实现类似分词查询
GET 索引/表/_search
{
"from":0,
"size":2,
"query":{
"bool":{
"filter":[
],
"must":[
{
"query_string":{
"query":"title:(*痛经*)",
"default_operator":"and"
}
}
]
}
},
"sort":{
"_uid":"desc"
}
}