Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to execute a SQL query against ElasticSearch (using org.elasticsearch.spark.sql format)?

With Spark 1.6.0 and ES 5.0.0-alpha5 I'm trying to get some dataframe in Spark. I'm using the package

--packages org.elasticsearch:elasticsearch-spark-13_2.10:5.0.0-alpha5

With

sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes", "my_host").load("logstash-2016.10.10/my_type")

I can do printSchema and see my fields.

However any of these will just hang forever, no exception, no task seem to be submitted to Spark

count, first, etc.

How can I debug this? Any hint would be appreciated!

Edit: I'm running elasticsearch in a docker container, to which I can (also) successfully connect via Kibana and direct http rest queries.

like image 324
Cedric H. Avatar asked Oct 10 '16 01:10

Cedric H.


3 Answers

In my case, I am running the ElasticSearch and Spark instances as Docker containers within Google Cloud. After reading and searching everywhere, I came across this: https://www.elastic.co/guide/en/elasticsearch/hadoop/master/cloud.html. The ES Spark connector has a setting called es.nodes.wan.only which makes it function within cloud environment such as Google Cloud or AWS. I had the exact same problem as the OP before, e.g. printSchema worked but any aggregrates did not work, and this exact setting fixed it for me!

You can configure it when initialising your SparkConfig, like this:

val conf = new SparkConf()
.setAppName("MySparkModel")
.set("es.nodes.wan.only", "true)
like image 67
Jeroen Avatar answered Oct 18 '22 10:10

Jeroen


DISCLAIMER: I've got no experience with ElasticSearch so mistakes are no exception.

I use the very latest unreleased Spark version 2.1.0-SNAPSHOT I built today's morning.

$ ./bin/spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
Branch master
Compiled by user jacek on 2016-12-02T05:06:30Z
Revision a5f02b00291e0a22429a3dca81f12cf6d38fea0b
Url https://github.com/apache/spark.git
Type --help for more information.

I use ElasticSearch 5.0.2 that I installed using the official documentation Installation Steps.

With no changes I ran an ES instance using ./bin/elasticsearch. No changes to the default configuration.

$ curl http://localhost:9200/
{
  "name" : "5PW1rOj",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "nqHBMN7JTw2j8_FD0FZpDg",
  "version" : {
    "number" : "5.0.2",
    "build_hash" : "f6b4951",
    "build_date" : "2016-11-24T10:07:18.101Z",
    "build_snapshot" : false,
    "lucene_version" : "6.2.1"
  },
  "tagline" : "You Know, for Search"
}

I found the elasticsearch-hadoop module for Spark. They have just released 5.0.2 so (after some reading about how to work with Spark and ES) I ran spark-shell as follows:

$ ./bin/spark-shell --packages org.elasticsearch:elasticsearch-spark-20_2.11:5.0.2

Please note that I knew and still know nothing about ElasticSearch.

When spark-shell was up I executed the following:

scala> import org.elasticsearch.spark.sql._
import org.elasticsearch.spark.sql._

scala> (0 to 10).toDF.saveToEs("spark/helloworld")

I did notice that the Spark job executed properly as I could've noticed it in the logs of ElasticSearch:

[2016-12-02T21:23:02,628][INFO ][o.e.c.m.MetaDataMappingService] [5PW1rOj] [spark/jXB6Km6xSjuHxitxjj6Ebw] create_mapping [helloworld]

After playing with ES for some time I ended up with the following indices:

$ http http://localhost:9200/spark
HTTP/1.1 200 OK
content-encoding: gzip
content-type: application/json; charset=UTF-8
transfer-encoding: chunked

{
    "spark": {
        "aliases": {},
        "mappings": {
            "hello": {
                "properties": {
                    "value": {
                        "type": "long"
                    }
                }
            },
            "hello2": {
                "properties": {
                    "value": {
                        "type": "long"
                    }
                }
            },
            "helloworld": {
                "properties": {
                    "value": {
                        "type": "long"
                    }
                }
            }
        },
        "settings": {
            "index": {
                "creation_date": "1480709570254",
                "number_of_replicas": "1",
                "number_of_shards": "5",
                "provided_name": "spark",
                "uuid": "jXB6Km6xSjuHxitxjj6Ebw",
                "version": {
                    "created": "5000299"
                }
            }
        }
    }
}

I could find my last spark/helloworld using the following query:

$ http http://localhost:9200/spark/helloworld/7
HTTP/1.1 404 Not Found
content-encoding: gzip
content-type: application/json; charset=UTF-8
transfer-encoding: chunked

{
    "_id": "7",
    "_index": "spark",
    "_type": "helloworld",
    "found": false
}

And the last but not least, time for Spark's web UI with the Spark job for ElasticSearch query.

Spark's web UI with a job for ElasticSearch

It appears that it works very well from the very beginning with no changes to ElasticSearch or the module.

Ah, I'd have forgotten about count:

scala> spark.read.format("es").load("spark/helloworld").count
res4: Long = 22

count in Spark's web UI

like image 4
Jacek Laskowski Avatar answered Oct 18 '22 12:10

Jacek Laskowski


The question is quite old, but I will describe the approach that I used to work with ES 5.0 and Spark 2.0 for future references. I think the ES-Hadoop documentation is a bit unclear on what artifact and API to use.

I used org.elasticsearch:elasticsearch-spark-20_2.11:5.0 and the following code:

// add to your class imports
import org.elasticsearch.spark.sql._

// Use Spark 2.0 SparkSession object to provide your config
val sparkSession = SparkSession.builder().config(...).getOrCreate()
// Optional step, imports things like $"column"
import sparkSession.implicits._
// Specify your index and type in ES
val df = spark.esDF("index/type")
// Perform an action
df.count()

I assume the situation is quite similar for Spark 1.6 with some minor changes. In particular, you should use SQLContext or HiveContext instead of SparkSession.

like image 1
Anton Okolnychyi Avatar answered Oct 18 '22 11:10

Anton Okolnychyi