次のAPIがElasticsearchのクエリによる削除を実行することがわかります - http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
ただし、一括検索を使用してドキュメントをアップロードすることはできますが、エラスティック検索の一括APIでも同様にしたいのです。
es.bulk(body=json_batch)
Elastic検索用のpythonバルクAPIを使用して、クエリによる削除を呼び出す方法がわかりません。
回答:
回答№1の場合は7elasticsearchがどのようにクエリAPIによる削除を非推奨にしたかを見てください。わたしは作った このPythonスクリプト バインディングを使って同じことをする。 まずES接続を定義します。
import elasticsearch
es = elasticsearch.Elasticsearch(["localhost"])
これを使用して、削除したい結果に対するクエリを作成できます。
search=es.search(
q="The Query to ES.",
index="*logstash-*",
size=10,
search_type="scan",
scroll="5m",
)
これで、そのクエリをループ内でスクロールできます。それをしながら私たちの要求を生成します。
while True:
try:
# Git the next page of results.
scroll=es.scroll( scroll_id=search["_scroll_id"], scroll="5m", )
# Since scroll throws an error catch it and break the loop.
except elasticsearch.exceptions.NotFoundError:
break
# We have results initialize the bulk variable.
bulk = ""
for result in scroll["hits"]["hits"]:
bulk = bulk + "{ "delete" : { "_index" : "" + str(result["_index"]) + "", "_type" : "" + str(result["_type"]) + "", "_id" : "" + str(result["_id"]) + "" } }n"
# Finally do the deleting.
es.bulk( body=bulk )
バルクAPIを使用するには、2つのことを確認する必要があります。
- 文書が識別されます。更新します。 (インデックス、タイプ、ID)
- 各要求は改行または/ nで終了します。
回答№2については4
ザ elasticsearch-py
一括APIを使用すると、レコードを一括して削除できます。 "_op_type": "delete"
各レコードに。ただし、query-by-deleteを使用したい場合は、2つのクエリを実行する必要があります。1つは削除するレコードを取得するためのもの、もう1つは削除するためのものです。
これをまとめて行う最も簡単な方法は、Pythonモジュールを使うことです。 scan()
ElasticSearch Scroll APIをラップしているので、追跡する必要はありません。 _scroll_id
s。でそれを使用してください bulk()
非推奨の代替としてのヘルパー delete_by_query()
:
from elasticsearch.helpers import bulk, scan
bulk_deletes = []
for result in scan(es,
query=es_query_body, # same as the search() body parameter
index=ES_INDEX,
doc_type=ES_DOC,
_source=False,
track_scores=False,
scroll="5m"):
result["_op_type"] = "delete"
bulk_deletes.append(result)
bulk(elasticsearch, bulk_deletes)
以来 _source=False
が渡されると、ドキュメント本体は返されないので、それぞれの結果はかなり小さくなります。ただし、メモリの制約がある場合は、これを簡単にバッチ処理できます。
BATCH_SIZE = 100000
i = 0
bulk_deletes = []
for result in scan(...):
if i == BATCH_SIZE:
bulk(elasticsearch, bulk_deletes)
bulk_deletes = []
i = 0
result["_op_type"] = "delete"
bulk_deletes.append(result)
i += 1
bulk(elasticsearch, bulk_deletes)
回答№3の4
私は現在@drsレスポンスに基づいてこのスクリプトを使用していますが、 バルク() 一貫してヘルパー。それを使用してイテレータからジョブのバッチを作成する機能があります chunk_size
パラメータ(デフォルトは500です。 straming_bulk() 詳細については)。
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk
BULK_SIZE = 1000
def stream_items(es, query):
for e in scan(es,
query=query,
index=ES_INDEX,
doc_type=ES_DOCTYPE,
scroll="1m",
_source=False):
# There exists a parameter to avoid this del statement (`track_source`) but at my version it doesn"t exists.
del e["_score"]
e["_op_type"] = "delete"
yield e
es = Elasticsearch(host="localhost")
bulk(es, stream_items(es, query), chunk_size=BULK_SIZE)
回答№4の場合は1
ありがとう、これは本当に役に立ちました!
2つの提案があります。
スクロールで結果の次のページを取得するときは、
es.scroll(scroll_id=search["_scroll_id"])
は_scroll_id
最後のスクロールで返されます。検索で返されたものではありません。 Elasticsearchは毎回スクロールIDを更新するわけではありません。 この議論このコードはうまくいくかもしれませんが、それは絶対確実というわけではありません。検索コンテキストを長期間開いたままにするとコストがかかるため、スクロールをクリアすることが重要です。 Clear Scroll API - Elasticsearch APIドキュメント それらはタイムアウト後に最終的に閉じますが、例えばディスク容量が少なくなった場合、頭痛の種を減らすことができます。
簡単な方法は、移動中にスクロールIDのリストを作成し(重複を排除するようにしてください)、最後にすべてをクリアすることです。
es.clear_scroll(scroll_id=scroll_id_list)