I'm trying to run a multi search request on the Elasticsearch Python client. I can run the singular search correctly but can't figure out how to format the request for a msearch. According to the documentation, the body of the request needs to be formatted as:
The request definitions (metadata-search request definition pairs), as either a newline separated string, or a sequence of dicts to serialize (one per row).
What's the best way to create this request body? I've been searching for examples but can't seem to find any.
If you follow the demo of official doc(even thought it's for BulkAPI) , you will find how to construct your request in python with the Elasticsearch
client:
Here is the newline separated string way:
def msearch():
es = get_es_instance()
search_arr = []
# req_head
search_arr.append({'index': 'my_test_index', 'type': 'doc_type_1'})
# req_body
search_arr.append({"query": {"term" : {"text" : "bag"}}, 'from': 0, 'size': 2})
# req_head
search_arr.append({'index': 'my_test_index', 'type': 'doc_type_2'})
# req_body
search_arr.append({"query": {"match_all" : {}}, 'from': 0, 'size': 2})
request = ''
for each in search_arr:
request += '%s \n' %json.dumps(each)
# as you can see, you just need to feed the <body> parameter,
# and don't need to specify the <index> and <doc_type> as usual
resp = es.msearch(body = request)
As you can see, the final-request is constructed by several req_unit. Each req_unit construct shows below:
request_header(search control about index_name, optional mapping-types, search-types etc.)\n
reqeust_body(which involves query detail about this request)\n
The sequence of dicts to serialize way is almost same with the previous one, except that you don't need to convert it to string:
def msearch():
es = get_es_instance()
request = []
req_head = {'index': 'my_test_index', 'type': 'doc_type_1'}
req_body = {
'query': {'term': {'text' : 'bag'}},
'from' : 0, 'size': 2 }
request.extend([req_head, req_body])
req_head = {'index': 'my_test_index', 'type': 'doc_type_2'}
req_body = {
'query': {'range': {'price': {'gte': 100, 'lt': 300}}},
'from' : 0, 'size': 2 }
request.extend([req_head, req_body])
resp = es.msearch(body = request)
Here is the structure it returns. Read more about msearch.
If you are using elasticsearch-dsl, you can use the class MultiSearch.
Example from the documentation:
from elasticsearch_dsl import MultiSearch, Search
ms = MultiSearch(index='blogs')
ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))
responses = ms.execute()
for response in responses:
print("Results for query %r." % response.search.query)
for hit in response:
print(hit.title)
Here is what I came up with. I am using the same document type and index so I optimized the code to run multiple queries with the same header:
from elasticsearch import Elasticsearch
from elasticsearch import exceptions as es_exceptions
import json
RETRY_ATTEMPTS = 10
RECONNECT_SLEEP_SECS = 0.5
def msearch(es_conn, queries, index, doc_type, retries=0):
"""
Es multi-search query
:param queries: list of dict, es queries
:param index: str, index to query against
:param doc_type: str, defined doc type i.e. event
:param retries: int, current retry attempt
:return: list, found docs
"""
search_header = json.dumps({'index': index, 'type': doc_type})
request = ''
for q in queries:
# request head, body pairs
request += '{}\n{}\n'.format(search_header, json.dumps(q))
try:
resp = es_conn.msearch(body=request, index=index)
found = [r['hits']['hits'] for r in resp['responses']]
except (es_exceptions.ConnectionTimeout, es_exceptions.ConnectionError,
es_exceptions.TransportError): # pragma: no cover
logging.warning("msearch connection failed, retrying...") # Retry on timeout
if retries > RETRY_ATTEMPTS: # pragma: no cover
raise
time.sleep(RECONNECT_SLEEP_SECS)
found = msearch(queries=queries, index=index, retries=retries + 1)
except Exception as e: # pragma: no cover
logging.critical("msearch error {} on query {}".format(e, queries))
raise
return found
es_conn = Elasticsearch()
queries = []
queries.append(
{"min_score": 2.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "batman"}}}]}}}
)
queries.append(
{"min_score": 1.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "ironman"}}}]}}}
)
queries.append(
{"track_scores": True, "min_score": 9.0, "query":
{"bool": {"should": [{"match": {"name": {"query": "not-findable"}}}]}}}
)
q_results = msearch(es_conn, queries, index='pipeliner_current', doc_type='event')
This may be what some of you are looking for if you want to do multiple queries on the same index and doc type.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With