I am using the scroll method to get a large number of events in batches. I do not know how to properely stop the scrolling.
What I am doing right now (it works) is to check for a TransportError
which signals a failed scroll attempt:
scanResp= es.search(
index="nessus_all",
doc_type="marker",
body={"query": {"match_all": {}}},
search_type="scan",
scroll="10m"
)
scrollId= scanResp['_scroll_id']
while True:
try:
response = es.scroll(scroll_id=scrollId, scroll= "10m")
# process results
except Exception as e:
log.debug("ended scroll: {e}".format(e=e))
break
# we are done with the search
This generates an error in /var/log/elasticsearch/security.log
:
[2015-02-16 09:36:07,110][DEBUG][action.search.type ] [eu4] [2791] Failed to execute query phase
org.elasticsearch.transport.RemoteTransportException: [eu5][inet[/10.81.147.186:9300]][indices:data/read/search[phase/scan/scroll]]
Caused by: org.elasticsearch.search.SearchContextMissingException: No search context found for id [2791]
at org.elasticsearch.search.SearchService.findContext(SearchService.java:502)
at org.elasticsearch.search.SearchService.executeScan(SearchService.java:236)
at org.elasticsearch.search.action.SearchServiceTransportAction$SearchScanScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:939)
at org.elasticsearch.search.action.SearchServiceTransportAction$SearchScanScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:930)
at org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler.run(MessageChannelHandler.java:275)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
and generally does not seem the right way to go?
According to the Elasticsearch's Scroll documentation (as of version 5.1):
Each call to the scroll API returns the next batch of results until there are no more results left to return, ie the hits array is empty.
So, I believe that the best method would be to check for len(response['hits']['hits'])
.
A more concrete example:
response = es.search(
index='index_name',
body=<your query here>,
scroll='10m'
)
scroll_id = response['_scroll_id']
while len(response['hits']['hits']):
response = es.scroll(scroll_id=scroll_id, scroll='10m')
# process results
After looking more closely at .scroll()
I came up with
scanResp= es.search(
index="nessus_all",
doc_type="marker",
body={"query": {"match_all": {}}},
search_type="scan",
scroll="10m"
)
scrollId= scanResp['_scroll_id']
totalhits = scanResp['hits']['total']
while totalhits > 0:
response = es.scroll(scroll_id=scrollId, scroll= "10m")
# process results
totalhits -= len(response['hits']['hits'])
# we are done with the search
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With