Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to query an Elasticsearch index using Pyspark and Dataframes

Elasticsaerch's documentation only covers loading a complete index to Spark.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()

How can you perform a query to return data from an Elasticsearch index and load them to Spark as a DataFrame using pyspark?

like image 334
George Lydakis Avatar asked Jul 02 '16 17:07

George Lydakis


2 Answers

Below is how I do it.

General environment settings and command:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

Code:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "filtered": {
      "filter": {
        "exists": {
          "field": "label"
        }
      },
      "query": {
        "match_all": {}
      }
    }
  }
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "titanic/passenger",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

You can also define data-frame columns. Refer Here for more info.

Hope that it helps!

like image 110
Eyal.Dahari Avatar answered Nov 18 '22 12:11

Eyal.Dahari


I am running my code in a EMR cluster from Amazon using pyspark. Then, the way I made it work was following these steps:

1) Put this bootstrap action in the cluster creation (to create localhost elasticsearch server):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb

2) I run these commands to populate the elasticsearch database with some data:

 curl -XPUT "http://localhost:9200/movies/movie/1" -d' {
   "title": "The Godfather",
   "director": "Francis Ford Coppola",
   "year": 1972
  }'

You can also run other curl commands if you wish, like:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}

3) I inited pyspark using the following parameters:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar

I had downloaded the elasticsearch python client previously

4) I run the following code:

from pyspark import SparkConf
from pyspark.sql import SQLContext

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "movies/movie",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

Then I finally got successful result from the command.

like image 2
lfvv Avatar answered Nov 18 '22 11:11

lfvv