/ / API python Elasticsearch: elimina i documenti per query: python, elasticsearch, pyes, pyelasticsearch

API python Elasticsearch: elimina i documenti per query: python, elasticsearch, pyes, pyelasticsearch

Vedo che la seguente API farà eliminare per query in Elasticsearch - http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html

Ma voglio fare lo stesso con l'API di ricerca elastica di massa, anche se potrei usare la massa per caricare i documenti usando

es.bulk(body=json_batch)

Non sono sicuro di come richiamare l'eliminazione per query utilizzando l'API bulk python per la ricerca elastica.

risposte:

7 per risposta № 1

Visto come elasticsearch ha deprecato l'eliminazione tramite l'API di query. ho creato questo script python usando le associazioni per fare la stessa cosa Per prima cosa definire una connessione ES:

import elasticsearch
es = elasticsearch.Elasticsearch(["localhost"])

Ora puoi usarlo per creare una query per i risultati che desideri eliminare.

search=es.search(
q="The Query to ES.",
index="*logstash-*",
size=10,
search_type="scan",
scroll="5m",
)

Ora puoi scorrere quella query in un ciclo. Genera la nostra richiesta mentre lo facciamo.

 while True:
try:
# Git the next page of results.
scroll=es.scroll( scroll_id=search["_scroll_id"], scroll="5m", )
# Since scroll throws an error catch it and break the loop.
except elasticsearch.exceptions.NotFoundError:
break
# We have results initialize the bulk variable.
bulk = ""
for result in scroll["hits"]["hits"]:
bulk = bulk + "{ "delete" : { "_index" : "" + str(result["_index"]) + "", "_type" : "" + str(result["_type"]) + "", "_id" : "" + str(result["_id"]) + "" } }n"
# Finally do the deleting.
es.bulk( body=bulk )

Per utilizzare l'API bulk è necessario garantire due cose:

  1. Il documento è identificato Vuoi aggiornarlo. (indice, tipo, id)
  2. Ogni richiesta è terminata con una nuova riga o / n.

4 per risposta № 2

Il elasticsearch-py bulk API ti consente di eliminare i record in blocco includendo "_op_type": "delete" in ogni record. Tuttavia, se si desidera eliminare per query, è comunque necessario effettuare due query: una per recuperare i record da eliminare e un'altra per eliminarli.

Il modo più semplice per fare questo in massa è usare i moduli python scan() helper, che avvolge l'API Scroll di ElasticSearch in modo da non doverne tenere traccia _scroll_idS. Usalo con bulk() aiutante come sostituto del deprecato delete_by_query():

from elasticsearch.helpers import bulk, scan

bulk_deletes = []
for result in scan(es,
query=es_query_body,  # same as the search() body parameter
index=ES_INDEX,
doc_type=ES_DOC,
_source=False,
track_scores=False,
scroll="5m"):

result["_op_type"] = "delete"
bulk_deletes.append(result)

bulk(elasticsearch, bulk_deletes)

Da _source=False viene passato, il corpo del documento non viene restituito, quindi ogni risultato è piuttosto piccolo. Tuttavia, se si dispone di vincoli di memoria, è possibile eseguirne il batch abbastanza facilmente:

BATCH_SIZE = 100000

i = 0
bulk_deletes = []
for result in scan(...):

if i == BATCH_SIZE:
bulk(elasticsearch, bulk_deletes)
bulk_deletes = []
i = 0

result["_op_type"] = "delete"
bulk_deletes.append(result)

i += 1

bulk(elasticsearch, bulk_deletes)

4 per risposta № 3

Attualmente sto usando questo script basato sulla risposta di @drs, ma usando massa() aiutante costantemente. Ha la capacità di creare lotti di lavori da un iteratore usando chunk_size parametro (predefinito su 500, vedere straming_bulk () per maggiori informazioni).

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk

BULK_SIZE = 1000

def stream_items(es, query):
for e in scan(es,
query=query,
index=ES_INDEX,
doc_type=ES_DOCTYPE,
scroll="1m",
_source=False):

# There exists a parameter to avoid this del statement (`track_source`) but at my version it doesn"t exists.
del e["_score"]
e["_op_type"] = "delete"
yield e

es = Elasticsearch(host="localhost")
bulk(es, stream_items(es, query), chunk_size=BULK_SIZE)

1 per risposta № 4

Grazie, è stato davvero utile!

Ho due suggerimenti:

  1. Quando ottieni la pagina successiva dei risultati con scroll, es.scroll(scroll_id=search["_scroll_id"]) dovrebbe essere il _scroll_id restituito nell'ultimo scorrimento, non quello restituito dalla ricerca. Elasticsearch non aggiorna l'ID di scorrimento ogni volta, specialmente con richieste più piccole (vedi questa discussione), quindi questo codice potrebbe funzionare, ma non è infallibile.

  2. È importante cancellare le pergamene poiché mantenere i contesti di ricerca aperti per un lungo periodo ha un costo. Clear Scroll API - Documentazione dell'API Elasticsearch Si chiuderanno alla fine dopo il timeout, ma se si è bassi sullo spazio del disco, ad esempio, si può risparmiare un sacco di mal di testa.

Un modo semplice è quello di creare un elenco di scroll ID in movimento (assicurati di eliminare i duplicati!) E di cancellare tutto alla fine.

es.clear_scroll(scroll_id=scroll_id_list)