Elasticsearch-Api-文档操作
Elasticsearch-Api-文档操作
Index 里面单条的记录称为 Document(文档)。许多条 Document 构成了一个 Index。
读写文档
Elasticsearch Guide [7.17] » REST APIs » Document APIs » Reading and Writing documents
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-replication.html
基本写模型
1、协调阶段(coordinating):根据路由规则将文档路由到主分片
2、主分片处理阶段(primary):验证文档,在主分片执行操作,转发到 in-sync 副本
/index/_doc 创建文档
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
API
POST /<target>/_doc/ 自动生成_id创建文档
PUT /<target>/_doc/<_id> 指定_id创建文档
PUT /<target>/_create/<_id> 只有当_id对应的文档不存在时才创建,否则报错
POST /<target>/_create/<_id> 只有当_id对应的文档不存在时才创建,否则报错
PUT /index/_doc/_id 指定_id创建文档
向指定的 /Index/Type/ID
发送 PUT 请求,就可以在 Index 里面新增一条记录。
ID 是调用方指定的唯一ID,如果已存在,则会完全替换更新文档并增加其版本 version
**在 ElasticSearch 7.0 及以上的版本中已经把 type 这个概念了,统一用 “_doc” 这个占位符来表示 “_type”**,你可以把 _type
看作是文档就行了,相当于 ElasticSearch 7.0 及以上版本只有索引和文档这两个概念了。
curl -X PUT 'http://localhost:9200/article/_doc/1' \
-H 'Content-Type: application/json' \
-d '{
"title":"文章的标题",
"pathname":"/article/postlink",
"content":"美国留给伊拉克的是个烂摊子吗"
}'
返回
{
"_index": "article",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
POST /index/_doc 自动生成_id创建文档
新增记录的时候,也可以不指定 id,这时要改成 POST 请求。
向指定的 /Index/Type
发送 POST 请求,可以在 Index 里面新增一条记录,系统会自动生成唯一ID。
curl --location --request POST 'http://localhost:9200/article/_doc' \
--header 'Content-Type: application/json' \
--data-raw '{
"title":"es的使用",
"pathname":"/article/es",
"content":"新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。"
}'
返回
{
"_index": "article",
"_type": "_doc",
"_id": "uv_ZjXEBrN9oq5tgVMuj",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
返回的 _id
是自动生成的唯一 id
refresh 刷新操作
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html
Elasticsearch 为了提高写入性能,会将文档写入操作(创建、更新、删除)的相关改变暂存到内存中的一个缓冲区里,然后在后台周期性地将这些改变刷新(refresh)到硬盘上的索引文件中。只有刷新操作完成后,这些改变才对搜索操作可见。
refresh 参数就是用来控制这个刷新操作的:
false
默认值,不执行刷新操作,这次写入的改变会在下次周期性刷新时被应用(1秒钟间隔)。true
立即执行刷新操作,使得这次写入的改变对搜索操作立即可见。wait_for
等待直到这次写入的改变被刷新并对搜索操作可见,es内部自动刷新默认是1秒钟间隔
频繁执行刷新操作会对 Elasticsearch 的性能产生影响,因此在大量写入操作时,通常是使用默认设置(false),即在后台周期性地执行刷新。
只有在某些需要改变立即对搜索可见的场景下,才会设置 refresh 参数为 true 或者 wait_for。
version 外部版本号
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning
ES 允许不使用内置的 version 进行版本控制,可以自定义使用外部的 version,此时将 version_type
设为 external
,此时可传入一个大于 0 小于 9.2e+18 的 long 型 version
参数。
使用外部版本号时,只有当你提供的 version 比当前文档的 _version 大的时候,才能完成修改(包括删除)。
例如常见的双写方案,MySQL 和 ES 各存一份数据,ES 用于加速查询,此时可以将 version 维护在 MySQL 中。
例如:
PUT my-index-000001/_doc/1?version=2&version_type=external
{
"user": {
"id": "elkbee"
}
}
POST /index/_update/_id 指定id更新文档
Elasticsearch Guide [7.16] » REST APIs » Document APIs » Update API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
POST /<index>/_update/<_id>
之前版本的api是 POST /index/_doc/_id/_update
通过脚本更新文档。脚本可更新、删除或跳过文档。更新 API 还支持传入一部分文档内容,最终合并到已存在文档中。如果想完全替换更新已存在文档,使用带 ID 的文档创建 API 即可。
update API 实现的逻辑中,其实可以理解为三步操作:
1、qeury:通过文档 ID 去 GET 文档,此时可获取文档的 _version
版本
2、update:根据 script 脚本来更新 document;
3、reindex:将更新后的 document 重新写回到索引,
如果在 GET 和 Reindex 期间,文档被更新,_version
值发生变化,则更新失败。可以使用 retry_on_conflict
参数来设置当发生更新上述情况更新失败时,自动重试的次数。retry_on_conflict
的默认值为0,即不重试。
因此,ES 的 update API 依然是需要对文档做一次完全的 reindex 操作,而不是直接去修改原始document。但 update API 所能做的是减少了网络交互次数,当然这比起我们自己通过index获取数据并在业务代码中更新再写回到ES来实现,大大的减少了版本冲突的概率。
在遇到版本冲突问题时,ES 将会返回 409 Conflict HTTP 错误码。因此,当遇到 409 后,为了保证数据的最终插入,我们就必须要考虑到 retry 机制。为了实现冲突后的retry,有两种方案来实现:
1、业务代码自定义
通过识别 409 错误,在业务代码中,跟据自己的需求来进行 retry。因为是自定义的逻辑,所以我们可以任意的操作 retry 的回退策略,以及 retry 的内容等;
2、retry_on_conflict
通过在参数中指定来实现 retry_on_conflict 来实现
script 脚本更新
将文档 1 的 counter 值加4
POST test/_update/1
{
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 4
}
}
}
doc 文档部分值更新
更新文档 1 的 name 值:
- 如果原来 name 没有值或者没有 name 字段,会新增 name 字段
- 如果 name 字段有值,会更新 name 字段的值
POST test/_update/1 { "doc": { "name": "new_name" } }
POST /index/_update_by_query 根据查询更新
Elasticsearch Guide [7.16] » REST APIs » Document APIs » Update By Query API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
conflicts=proceed 版本冲突时继续处理
提交 _update_by_query 请求后,Elasticsearch 先获取索引数据的当前快照,然后使用 内部版本 _version
更新和 query 匹配的文档:
- 如果
_version
能匹配则更新文档,然后增加_version
版本号。 - 如果在建立快照和更新当前文档之间文档被更新,会出现
_version
不匹配,导致版本冲突,更新操作失败。可以将 conflicts 设为 proceed 在冲突时继续处理,response 返回冲突个数。
处理 _update_by_query 请求时,Elasticsearch 内部进行多批次 search 查询请求来匹配满足条件的文档,然后在每一批匹配的文档上执行一次 bulk 更新请求。
conflicts
参数,遇到版本冲突时如何处理:
abort
默认值,报错proceed
继续处理,返回 response 中有冲突文档个数。
ignore_unavailable=true 索引不存在时不报错
POST /my_index/_update_by_query?ignore_unavailable=true
Java 代码中:
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
request.setIndicesOptions(IndicesOptions.lenientExpandOpen()); // 索引不存在时不报错
/_update_by_query 可用于热更新ik词库后重建索引
POST /my_index/_update_by_query?conflicts=proceed
可用于 ik 词库热更新后,重建索引,使词库中新加的单词生效
script 脚本条件更新
删除 status=published,且发布时间在指定范围的数据
POST /my_index/_update_by_query
{
"query": {
"bool": {
"must": [{
"term": {
"status": "published"
}
},
{
"range": {
"publish_time": {
"lt": "2023-10-01 00:00:00",
"gte": "2023-09-20 00:00:00"
}
}
}]
}
},
"script": {
"lang": "painless",
"source": "ctx._source.key1 = ctx._source.key2; ctx._source.key3.subkey31 = ctx._source.key4.subkey41;"
}
}
删除 dense_vector 向量字段
删除 title_vector 和 content_vector 两个 dense_vector 向量字段
POST /my_index/_update_by_query
{
"script": {
"source": "ctx._source.remove('title_vector'); ctx._source.remove('content_vector');",
"lang": "painless"
},
"query": {
"term": {
"id": "e76dc6ec97ad415882658dd62bcf69e3"
}
}
}
GET /index/_doc/_id 根据id查询文档
Elasticsearch Guide [7.16] » REST APIs » Document APIs » Get API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html
GET <index>/_doc/<_id>
根据 ID 查询文档
ES 文档上的每一次写操作,包括删除,都会使文档的 _version
递增,已删除文档的 version 会在一小段时间内保持可见,时间由配置项 index.gc_deletes
决定,默认是 60 秒。
例如 GET 'http://localhost:9200/article/_doc/uv_ZjXEBrN9oq5tgVMuj'
返回
{
"_index": "article",
"_type": "_doc",
"_id": "uv_ZjXEBrN9oq5tgVMuj",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"title": "es的使用",
"pathname": "/article/es",
"content": "新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。"
}
}
ID 不存在时,返回 "found": false
{
"_index": "article",
"_type": "_doc",
"_id": "1",
"found": false
}
DELETE /index/_doc/_id 删除文档
Elasticsearch Guide [8.1] » REST APIs » Document APIs » Delete API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html
DELETE /<index>/_doc/<_id>
通过修改 version
进行删除,异步合并 Segment 时才真正删除。
POST /index/_delete_by_query 根据条件删除
https://www.elastic.co/guide/en/elasticsearch/reference/7.6/docs-delete-by-query.html
POST /<target>/_delete_by_query
根据条件删除文档,使用和 search 接口相同的查询条件语法,可使用 URI 条件或 body 条件。
例如
POST /my-index-000001/_delete_by_query
{
"query": {
"match": {
"user.id": "elkbee"
}
}
}
删除索引中的全部文档(清空索引)
使用空条件删除即可
POST /my-index-000001/_delete_by_query
{
"query": {
"match_all": {}
}
}
wait_for_completion=false 异步删除
Elasticsearch 的 /_delete_by_query 接口默认是同步执行的,它会等待所有的匹配文档被处理后才返回。这意味着如果查询命中的数据量很大,那么删除操作可能会花费相当长的时间。
可以通过在请求中设置 wait_for_completion=false 参数来将其变为异步操作。在这种情况下,Elasticsearch 会先做一些请求合法性检查,然后立即返回一个任务ID,你可以使用这个ID来获取删除操作的进度或取消任务。内部是在 .tasks/task/${taskId}
索引中插入了一个文档来记录删除任务。
ignore_unavailable=true 索引不存在时不报错
默认情况下,如果索引不存在,返回 index_not_found_exception 错误
如果希望在索引不存在时不报错,你可以使用 ignore_unavailable=true 选项。这个选项会让 Elasticsearch 忽略那些在执行操作时不存在的索引。
POST /your_index/_delete_by_query?ignore_unavailable=true
{
"query": {
"match_all": {}
}
}
java中:
DeleteByQueryRequest request = new DeleteByQueryRequest("your_index");
request.setQuery(QueryBuilders.matchAllQuery());
request.setIndicesOptions(IndicesOptions.lenientExpandOpen()); // 索引不存在时不报错
BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);
slices 划分子任务
_delete_by_query 接口的 slices 参数用于将删除操作划分为多个并行任务以提高性能。这个参数的值可以是一个整数或者是 auto。
默认值是 1,表示不会划分子任务。
当你设定 slices 参数为一个大于1的整数,比如设定为5,那么 Elasticsearch 将把需要删除的文档划分为5个分片,每个分片都会并行地进行删除操作。这种方式可以有效地提高大规模删除操作的效率。
如果你设定 slices 参数为 auto,那么 Elasticsearch 会自动选择合适的分片数量,这个数量通常是索引的分片(shards)数。
需要注意的是,虽然增加 slices 的数量可以提高删除操作的速度,但是也会增加 Elasticsearch 集群的负载。因此,你需要根据你的集群的性能和负载情况来合理设定这个参数。
此外,slices 参数并不是越大越好。如果你的删除操作涉及的文档数量本身就不大,那么增加 slices 的数量并不会带来显著的性能提升,反而可能会因为创建过多的并行任务而浪费资源。
总的来说,slices 参数是一个用于优化删除操作性能的工具,你需要根据你的实际情况来合理使用它。
scroll 和 scroll_size 批次间隔与数量
由于删除的数据量可能很大,无法一次性处理删除,_delete_by_query 内部使用 scroll 进行批量查询并删除。
- scroll 参数的值为一个时间值,表示上下文的保持时间,默认值 5 分钟。例如,如果设置scroll=1m,则表示上下文将保持1分钟。
- scroll_size 参数用于控制每个批次检索和删除的文档数量,默认值 1000
scroll 是 Elasticsearch 中用于处理大量数据的机制,它可以在多个请求之间保留搜索上下文,以便能够获取到所有符合查询条件的文档,而不仅仅是返回的第一批文档。
scroll 参数的保持时间不宜设置得过长,因为保持搜索上下文会占用资源。
scroll_size 参数决定了每次批量操作的文档数量。例如,如果设置 scroll_size 为 1000,那么每个批次将处理 1000 个文档,然后再处理下一个批次。
这个参数可以根据你的需求和 Elasticsearch 集群的能力来调整。如果你设置的 scroll_size 太大,可能会消耗大量的内存和CPU资源,从而导致性能问题。反之,如果 scroll_size 太小,那么每次处理的文档数量就会很少,从而需要更多的批次来完成所有文档的处理,这可能会导致整个操作的效率较低。
因此,你需要根据你的情况来适当地设置 scroll_size 参数,以在性能和效率之间找到一个平衡。
requests_per_second 限制删除速度
requests_per_second 参数在 Elasticsearch 的删除查询接口中用于控制删除操作的吞吐量。这个参数的目标是防止删除操作过于频繁或过快地执行,从而影响 Elasticsearch 集群的性能。
这个参数是一个可选的参数,它的默认值是-1,表示删除操作将尽可能快地执行。你可以设置这个参数为任意浮点数,比如1.5,这表示 Elasticsearch 将每秒执行1.5次删除操作。
条件删除原理
_delete_by_query 并不是真正意义上物理文档删除,而是只是版本变化并且对文档增加了删除标记。当我们再次搜索的时候,会搜索全部然后过滤掉有删除标记的文档。因此,该索引所占的空间并不会随着该 API 的操作磁盘空间会马上释放掉,只有等到下一次段合并的时候才真正被物理删除,这个时候磁盘空间才会释放。相反,在被查询到的文档标记删除过程同样需要占用磁盘空间,这个时候,你会发现触发该 API 操作的时候磁盘不但没有被释放,反而磁盘使用率上升了。
POST /_bulk 批量操作
Elasticsearch Guide [7.17] » REST APIs » Document APIs » Bulk API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
POST /_bulk
POST /<target>/_bulk
注意:
- _bulk 批量操作操作使用 NDJSON(Newline Delimited JSON) 格式的请求体,批量操作的 body 必须是一行 action 紧接着一行数据(delete 不需要数据),数据必须在一行中且中间不能换行,一行数据结束后必须换行才能接下一个 action,且最后必须以一个空行结束
- _bulk 批量操作的 HTTP 请求
Content-Type
可以使用application/json
或application/x-ndjson
例如
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "test"} }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
或
{ "index": { "_index": "mytest", "_id": "1" } }
{ "content": "美国留给伊拉克的是个烂摊子吗" }
{ "index": { "_index": "mytest", "_id": "2" } }
{ "content": "公安部:各地校车将享最高路权" }
{ "index": { "_index": "mytest", "_id": "3" } }
{ "content": "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船" }
{ "index": { "_index": "mytest", "_id": "4" } }
{ "content": "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首" }
返回体中,每个操作有一个单独的结果,按入参请求顺序排列,各个操作是否成功互不影响
例如
{
"took": 131,
"errors": false,
"items": [
{
"index": {
"_index": "index_ik",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1,
"status": 201
}
},
{
"index": {
"_index": "index_ik",
"_id": "2",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1,
"status": 201
}
}
]
}
refresh=true 操作后立即对搜索可见
默认情况下, /bulk 操作后如果立即执行 /_search 或 /_count 可能无法检索到操作的数据,因为索引分片还未刷新。
refresh 参数可控制索引刷新:
true
在该批次操作完成后,立即刷新所有相关的分片。这意味着,在该批次操作后,所有的更改都将立即对搜索可见。false
默认值,在该批次操作完成后,不立即刷新相关的分片。Elasticsearch 将按照其默认的刷新间隔进行刷新。这也是默认行为。wait_for
在该批次操作完成后,等待自动刷新使得更改对搜索可见,之后再返回。这意味着,请求将在刷新完成后才返回。
所以,refresh=true 或 wait_for 时,请求返回后都立即对搜索可见,只不过 refresh=true 内部会触发立即刷新,而 refresh=wait_for 内部是等自动刷新完成。
注意,refresh=true 或 wait_for 时,只会等待批量操作文档路由到的相关的 shards 刷新,而不是等待全部 shards 刷新
Java 代码中设置 refresh
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
enum RefreshPolicy implements Writeable {
/**
* Don't refresh after this request. The default.
*/
NONE("false"),
/**
* Force a refresh as part of this request. This refresh policy does not scale for high indexing or search throughput but is useful
* to present a consistent view to for indices with very low traffic. And it is wonderful for tests!
*/
IMMEDIATE("true"),
/**
* Leave this request open until a refresh has made the contents of this request visible to search. This refresh policy is
* compatible with high indexing and search throughput but it causes the request to wait to reply until a refresh occurs.
*/
WAIT_UNTIL("wait_for");
}
Malformed action/metadata line
原因:批量操作的 body 必须是一行 action 一行数据(delete 不需要数据),数据必须在一行中且中间不能换行,一行数据结束后必须换行才能接下一个 action,且最后必须以一个空行结束
比如
{ "index": {"_index": "user_profile", "_type": "base_info", "_id": 1234567 } }
{ "user_id": 1234567 }
是正确的,但如果 改为
{ "index": {"_index": "user_profile", "_type": "base_info", "_id": 1234567 } }
{
"user_id": 1234567
}
就会报下面的错误
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Malformed action/metadata line [3], expected START_OBJECT but found [VALUE_STRING]"
}
],
"type": "illegal_argument_exception",
"reason": "Malformed action/metadata line [3], expected START_OBJECT but found [VALUE_STRING]"
},
"status": 400
}
如果请求 body 的最后没有换行 \n
,就会报下面的错误:
{
"error":{
"root_cause":[
{
"type":"illegal_argument_exception",
"reason":"The bulk request must be terminated by a newline [
]"
}
],
"type":"illegal_argument_exception",
"reason":"The bulk request must be terminated by a newline [
]"
},
"status":400
}
BULK API : Malformed action/metadata line [3], expected START_OBJECT but found [VALUE_STRING]
https://stackoverflow.com/questions/45792309/bulk-api-malformed-action-metadata-line-3-expected-start-object-but-found
乐观并发控制
Elasticsearch Guide [7.17] » REST APIs » Document APIs » Optimistic concurrency control
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/optimistic-concurrency-control.html
每个文档都有一个 _version
版本号,当文档被修改时版本号递增。 Elasticsearch 使用这个 _version
号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。
为了避免丢失数据, 更新 API 会在获取步骤中获取当前文档中的 _version
,然后将其传递给重新索引步骤中的 索引 请求。如果其他的进程在这两步之间修改了这个文档,那么 _version
就会不同,这样更新就会失败。
409/Conflict
2 个请求并发对同一个 id 的文档进行更新:
请求 1 获取文档版本号是 1
请求 2 获取文档版本号是 1
请求 2 重新索引文档,写入成功,版本号更新为 2
请求 1 重新索引文档时,发现已有的文档 版本号是 2,索引失败,返回 409 Conflict
上一篇 Java-线程与线程池
页面信息
location:
protocol
: host
: hostname
: origin
: pathname
: href
: document:
referrer
: navigator:
platform
: userAgent
: