Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Index a pandas dataframe into Elasticsearch without elasticsearch-py

I would like to index a bunch of large pandas dataframes (some million rows and 50 columns) into Elasticsearch.

When looking for examples on how to do this, most people will use elasticsearch-py's bulk helper method, passing it an instance of the Elasticsearch class which handles the connection as well as a list of dictionaries which is created with pandas' dataframe.to_dict(orient='records') method. Metadata can be inserted into the dataframe beforehand as new columns, e.g. df['_index'] = 'my_index' etc.

However, I have reasons not to use the elasticsearch-py library and would like to talk to the Elasticsearch bulk API directly, e.g. via requests or another convenient HTTP library. Besides, df.to_dict() is very slow on large dataframes, unfortunately, and converting a dataframe to a list of dicts which is then serialized to JSON by elasticsearch-py sounds like unnecessary overhead when there is something like dataframe.to_json() which is pretty fast even on large dataframes.

What would be an easy and quick approach of getting a pandas dataframe into the format required by the bulk API? I think a step in the right direction is using dataframe.to_json() as follows:

import pandas as pd
df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}])
df
   a  b
0  1  2
1  3  4
2  5  6
df.to_json(orient='records', lines=True)
'{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}'

This is now a newline-separated JSON string, however, it is still lacking the metadata. What would be a performing way to get it in there?

edit: For completeness, a metadata JSON document would look like that:

{"index": {"_index": "my_index", "_type": "my_type"}}

Hence, in the end the whole JSON expected by the bulk API would look like this (with an additional linebreak after the last line):

{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":1,"b":2}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":3,"b":4}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":5,"b":6}
like image 882
Dirk Avatar asked Jan 03 '17 09:01

Dirk


2 Answers

Meanwhile I found out multiple possibilities how to do that with at least reasonable speed:

import json
import pandas as pd
import requests

# df is a dataframe or dataframe chunk coming from your reading logic
df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id
df_as_json = df.to_json(orient='records', lines=True)

final_json_string = ''
for json_document in df_as_json.split('\n'):
    jdict = json.loads(json_document)
    metadata = json.dumps({'index': {'_id': jdict['_id']}})
    jdict.pop('_id')
    final_json_string += metadata + '\n' + json.dumps(jdict) + '\n'

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60) 

Instead of using pandas' to_json() method, one could also use to_dict() as follows. This was slightly slower in my tests but not much:

dicts = df.to_dict(orient='records')
final_json_string = ''
for document in dicts:
    metadata = {"index": {"_id": document["_id"]}}
    document.pop('_id')
    final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n'

When running this on large datasets, one can save a couple of minutes by replacing Python's default json library with ujson or rapidjson via installing it, then import ujson as json or import rapidjson as json, respectively.

An even bigger speedup can be achieved by replacing the sequential execution of the steps with a parallel one so that reading and converting does not stop while requests is waiting for Elasticsearch to process all documents and return a response. This could by done via Threading, Multiprocessing, Asyncio, Task Queues, ... but this is out of the scope of this question.

If you happen to find an approach to do the to-json-conversion even faster, let me know.

like image 131
Dirk Avatar answered Sep 30 '22 14:09

Dirk


This function insert a pandas dataframe into elastic search (chunk by chunk)

def insertDataframeIntoElastic(dataFrame,index='index', typ = 'test', server = 'http://localhost:9200',
                           chunk_size = 2000):
    headers = {'content-type': 'application/x-ndjson', 'Accept-Charset': 'UTF-8'}
    records = dataFrame.to_dict(orient='records')
    actions = ["""{ "index" : { "_index" : "%s", "_type" : "%s"} }\n""" % (index, typ) +json.dumps(records[j])
                    for j in range(len(records))]
    i=0
    while i<len(actions):
        serverAPI = server + '/_bulk' 
        data='\n'.join(actions[i:min([i+chunk_size,len(actions)])])
        data = data + '\n'
        r = requests.post(serverAPI, data = data, headers=headers)
        print r.content
        i = i+chunk_size
like image 40
Ali Mirzaei Avatar answered Sep 30 '22 15:09

Ali Mirzaei