/ / Elasticsearch API python: Supprimer des documents par requête - python, elasticsearch, pyes, pyelasticsearch

Elasticsearch python API: Supprimer des documents par requête - python, elasticsearch, pyes, pyelasticsearch

Je vois que l'API suivante supprimera par requête dans Elasticsearch - http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html

Mais je veux faire la même chose avec l’API de recherche en vrac élastique, même si je pouvais utiliser du vrac pour télécharger des documents à l’aide de

es.bulk(body=json_batch)

Je ne sais pas comment appeler suppression par requête à l'aide de l'API en vrac python pour la recherche Elastic.

Réponses:

7 pour la réponse № 1

Voir comment elasticsearch a déconseillé l’API de suppression par requête. j'ai crée ce script python en utilisant les liaisons pour faire la même chose. La première chose à définir une connexion ES:

import elasticsearch
es = elasticsearch.Elasticsearch(["localhost"])

Vous pouvez maintenant l'utiliser pour créer une requête pour les résultats que vous souhaitez supprimer.

search=es.search(
q="The Query to ES.",
index="*logstash-*",
size=10,
search_type="scan",
scroll="5m",
)

Vous pouvez maintenant faire défiler cette requête en boucle. Générez notre demande pendant que nous le faisons.

 while True:
try:
# Git the next page of results.
scroll=es.scroll( scroll_id=search["_scroll_id"], scroll="5m", )
# Since scroll throws an error catch it and break the loop.
except elasticsearch.exceptions.NotFoundError:
break
# We have results initialize the bulk variable.
bulk = ""
for result in scroll["hits"]["hits"]:
bulk = bulk + "{ "delete" : { "_index" : "" + str(result["_index"]) + "", "_type" : "" + str(result["_type"]) + "", "_id" : "" + str(result["_id"]) + "" } }n"
# Finally do the deleting.
es.bulk( body=bulk )

Pour utiliser l’API en vrac, vous devez vous assurer de deux choses:

  1. Le document est identifié que vous souhaitez mettre à jour. (index, type, id)
  2. Chaque demande se termine par un saut de ligne ou / n.

4 pour la réponse № 2

le elasticsearch-py API en bloc vous permet de supprimer des enregistrements en bloc en incluant "_op_type": "delete" dans chaque enregistrement. Toutefois, si vous souhaitez supprimer une requête à la fois, vous devez toujours effectuer deux requêtes: une pour récupérer les enregistrements à supprimer et une autre pour les supprimer.

Le moyen le plus simple de le faire en bloc est d’utiliser des modules python scan() helper, qui encapsule l’API ElasticSearch Scroll afin que vous ne soyez pas obligé de garder une trace de _scroll_ids. Utilisez-le avec le bulk() aide en remplacement de la obsolète delete_by_query():

from elasticsearch.helpers import bulk, scan

bulk_deletes = []
for result in scan(es,
query=es_query_body,  # same as the search() body parameter
index=ES_INDEX,
doc_type=ES_DOC,
_source=False,
track_scores=False,
scroll="5m"):

result["_op_type"] = "delete"
bulk_deletes.append(result)

bulk(elasticsearch, bulk_deletes)

Depuis _source=False est passé, le corps du document n’est pas retourné, donc chaque résultat est assez petit. Cependant, si vous avez des contraintes de mémoire, vous pouvez facilement le traiter par lots:

BATCH_SIZE = 100000

i = 0
bulk_deletes = []
for result in scan(...):

if i == BATCH_SIZE:
bulk(elasticsearch, bulk_deletes)
bulk_deletes = []
i = 0

result["_op_type"] = "delete"
bulk_deletes.append(result)

i += 1

bulk(elasticsearch, bulk_deletes)

4 pour la réponse № 3

J'utilise actuellement ce script basé sur la réponse @drs, mais en utilisant masse() aide constamment. Il est capable de créer des lots de travaux à partir d'un itérateur en utilisant chunk_size paramètre (par défaut à 500, voir straming_bulk () pour plus d'informations).

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk

BULK_SIZE = 1000

def stream_items(es, query):
for e in scan(es,
query=query,
index=ES_INDEX,
doc_type=ES_DOCTYPE,
scroll="1m",
_source=False):

# There exists a parameter to avoid this del statement (`track_source`) but at my version it doesn"t exists.
del e["_score"]
e["_op_type"] = "delete"
yield e

es = Elasticsearch(host="localhost")
bulk(es, stream_items(es, query), chunk_size=BULK_SIZE)

1 pour la réponse № 4

Merci, c'était vraiment utile!

J'ai deux suggestions:

  1. Lors de l'obtention de la page suivante de résultats avec scroll, es.scroll(scroll_id=search["_scroll_id"]) devrait être le _scroll_id retourné dans le dernier rouleau, pas celui que la recherche a renvoyé. Elasticsearch ne met pas à jour l'ID de défilement à chaque fois, en particulier pour les requêtes plus petites (voir cette discussion), donc ce code pourrait fonctionner, mais ce n’est pas infaillible.

  2. Il est important de supprimer les parchemins, car le maintien des contextes de recherche ouverts pendant longtemps a un coût. Clear Scroll API - Documentation sur l'API Elasticsearch Ils se fermeront éventuellement après expiration du délai, mais si vous manquez d'espace disque, par exemple, vous éviterez beaucoup de maux de tête.

Un moyen simple est de créer une liste d'identifiants de défilement lors de vos déplacements (assurez-vous de vous débarrasser des doublons!) Et de tout effacer à la fin.

es.clear_scroll(scroll_id=scroll_id_list)