Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create request body for Python Elasticsearch mSearch

I'm trying to run a multi search request on the Elasticsearch Python client. I can run the singular search correctly but can't figure out how to format the request for a msearch. According to the documentation, the body of the request needs to be formatted as:

The request definitions (metadata-search request definition pairs), as either a newline separated string, or a sequence of dicts to serialize (one per row).

What's the best way to create this request body? I've been searching for examples but can't seem to find any.

like image 299
kevin.w.johnson Avatar asked Feb 16 '15 16:02

kevin.w.johnson


3 Answers

If you follow the demo of official doc(even thought it's for BulkAPI) , you will find how to construct your request in python with the Elasticsearch client:

Here is the newline separated string way:

def msearch():
    es = get_es_instance()

    search_arr = []
    # req_head
    search_arr.append({'index': 'my_test_index', 'type': 'doc_type_1'})
    # req_body
    search_arr.append({"query": {"term" : {"text" : "bag"}}, 'from': 0, 'size': 2})

    # req_head
    search_arr.append({'index': 'my_test_index', 'type': 'doc_type_2'})
    # req_body
    search_arr.append({"query": {"match_all" : {}}, 'from': 0, 'size': 2})

    request = ''
    for each in search_arr:
        request += '%s \n' %json.dumps(each)

    # as you can see, you just need to feed the <body> parameter,
    # and don't need to specify the <index> and <doc_type> as usual 
    resp = es.msearch(body = request)

As you can see, the final-request is constructed by several req_unit. Each req_unit construct shows below:

request_header(search control about index_name, optional mapping-types, search-types etc.)\n
reqeust_body(which involves query detail about this request)\n

The sequence of dicts to serialize way is almost same with the previous one, except that you don't need to convert it to string:

def msearch():
    es = get_es_instance()

    request = []

    req_head = {'index': 'my_test_index', 'type': 'doc_type_1'}
    req_body = {
        'query': {'term': {'text' : 'bag'}}, 
        'from' : 0, 'size': 2  }
    request.extend([req_head, req_body])

    req_head = {'index': 'my_test_index', 'type': 'doc_type_2'}
    req_body = {
        'query': {'range': {'price': {'gte': 100, 'lt': 300}}},
        'from' : 0, 'size': 2  }
    request.extend([req_head, req_body])

    resp = es.msearch(body = request)

Here is the structure it returns. Read more about msearch.

like image 109
Lyfing Avatar answered Oct 16 '22 17:10

Lyfing


If you are using elasticsearch-dsl, you can use the class MultiSearch.

Example from the documentation:

from elasticsearch_dsl import MultiSearch, Search

ms = MultiSearch(index='blogs')

ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))

responses = ms.execute()

for response in responses:
    print("Results for query %r." % response.search.query)
    for hit in response:
        print(hit.title)
like image 23
tsauerwein Avatar answered Oct 16 '22 17:10

tsauerwein


Here is what I came up with. I am using the same document type and index so I optimized the code to run multiple queries with the same header:

from elasticsearch import Elasticsearch
from elasticsearch import exceptions as es_exceptions
import json

RETRY_ATTEMPTS = 10
RECONNECT_SLEEP_SECS = 0.5

def msearch(es_conn, queries, index, doc_type, retries=0):
    """
    Es multi-search query
    :param queries: list of dict, es queries
    :param index: str, index to query against
    :param doc_type: str, defined doc type i.e. event
    :param retries: int, current retry attempt
    :return: list, found docs
    """
    search_header = json.dumps({'index': index, 'type': doc_type})
    request = ''
    for q in queries:
        # request head, body pairs
        request += '{}\n{}\n'.format(search_header, json.dumps(q))
    try:
        resp = es_conn.msearch(body=request, index=index)
        found = [r['hits']['hits'] for r in resp['responses']]
    except (es_exceptions.ConnectionTimeout, es_exceptions.ConnectionError,
            es_exceptions.TransportError):  # pragma: no cover
        logging.warning("msearch connection failed, retrying...")  # Retry on timeout
        if retries > RETRY_ATTEMPTS:  # pragma: no cover
            raise
        time.sleep(RECONNECT_SLEEP_SECS)
        found = msearch(queries=queries, index=index, retries=retries + 1)
    except Exception as e:  # pragma: no cover
        logging.critical("msearch error {} on query {}".format(e, queries))
        raise
    return found

es_conn = Elasticsearch()
queries = []
queries.append(
    {"min_score": 2.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "batman"}}}]}}}
)
queries.append(
    {"min_score": 1.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "ironman"}}}]}}}
)
queries.append(
    {"track_scores": True, "min_score": 9.0, "query":
        {"bool": {"should": [{"match": {"name": {"query": "not-findable"}}}]}}}
)
q_results = msearch(es_conn, queries, index='pipeliner_current', doc_type='event')

This may be what some of you are looking for if you want to do multiple queries on the same index and doc type.

like image 34
radtek Avatar answered Oct 16 '22 17:10

radtek