Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: optimized join with ElasticSearch index

Tags:

So I'm learning to take data from ElasticSearch throught Apache Spark. Let's say I've connected to ElasticSearch that has 'users' index.

sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')

explain(usersES) shows me this:

== Physical Plan ==

Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147, uid#148]

When I use filter:

usersES.filter(usersES.uid==1566324).explain()

== Physical Plan == Filter (uid#203L = 1566324) +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters: [EqualTo(uid,1566324)]

As you see, Spark elegantly pushes the filter to ElasticSearch, making the index search fast and comfortable.

But when I try joining usersES with another dataframe, I get the same issue all the time: Spark scans through the whole ElasticSearch index, not pushing any filters I give it. For example:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()

shows:

SortMergeJoin [id#210L], [uid#203L] :- Sort [id#210L ASC], false, 0 : +- TungstenExchange hashpartitioning(id#210L,200), None : +- ConvertToUnsafe : +- Scan ExistingRDD[id#210L] +- Sort [uid#203L ASC], false, 0 +- TungstenExchange hashpartitioning(uid#203L,200), None +- ConvertToUnsafe +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148]

Please, tell me, is that possible to push filter inside Elasticsearch inside the join?

like image 666
Dmitry Zuev Avatar asked Aug 04 '16 09:08

Dmitry Zuev


1 Answers

This is an expected behavior, yes elaticsearch-hadoop connector supports pushdown predicate but there is no push when you join.

This is because the join operation does not know anything about how the keys are partitioned in your dataframes.

By default, this operation will hash all the keys of both dataframes, sending all the elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine.

And that's why you get that execution plan without the predicate being pushed down.

EDIT : It seems like the connector supports since the version 2.1 the IN clause. You ought using that if your DataFrame a isn't big.

  • Ref 1. https://github.com/elastic/elasticsearch-hadoop/issues/556
  • Ref 2. https://github.com/elastic/elasticsearch-hadoop/commit/bbe6154226f4700a9ac23c3d611b6c313bfbaf59
like image 72
eliasah Avatar answered Oct 11 '22 12:10

eliasah