Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elasticsearch + Apache Spark performance

I am trying to use Apache spark to query my data in Elasticsearch but my spark job is taking about 20 hours to do an aggregation and still running. The same query in ES takes about 6 sec.

I understand the data has to move from Elasticsearch cluster to my spark cluster and some data shuffling in Spark.

The data inside my ES Index is approx. 300 million documents and each document has about 400 fields (1.4Terrabyte).

I've got a 3 node spark cluster(1 master, 2 workers) with 60GB of memory and 8 cores in total.

The time it takes to run is not acceptable, is there a way to make my spark job run faster ?

Here is my spark configuration:

SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
                 .setMaster("spark://10.0.0.203:7077")    
                 .set("es.nodes", "10.0.0.207")
                 .set("es.cluster", "wp-es-reporting-prod")              
                .setJars(JavaSparkContext.jarOfClass(Demo.class))
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.default.parallelism", String.valueOf(cpus * 2))
                .set("spark.executor.memory", "8g");

Edited

    SparkContext sparkCtx = new SparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sparkCtx);
    DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");

    DataFrame dfCleaned = cleanSchema(sqlContext, df);

    dfCleaned.registerTempTable("RPT");

    DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");

    for (Row row : sqlDFTest.collect()) {
        System.out.println(">> " + row);
    }
like image 672
Adetiloye Philip Kehinde Avatar asked Jul 17 '15 15:07

Adetiloye Philip Kehinde


2 Answers

I figured out what was going on, basically, I was trying to manipulate the dataframe schema because I have some fields with a dot e.g user.firstname. This seems to cause a problem in the collect phase of spark. To resolve this, I had to just re-index my data so my fields no longer have dot but an underscore e.g user_firstname.

like image 195
Adetiloye Philip Kehinde Avatar answered Sep 20 '22 02:09

Adetiloye Philip Kehinde


I'm afraid you can't perform a group by over 1.4 TB with only 120 GB of total RAM and achieve good performance. DF will try to load all data in memory/disk and only then it will perform group by. I don't think that at the moment spark/ES connector translates sql syntax in ES query language.

like image 45
axlpado - Agile Lab Avatar answered Sep 20 '22 02:09

axlpado - Agile Lab