I am trying to use Apache spark to query my data in Elasticsearch but my spark job is taking about 20 hours to do an aggregation and still running. The same query in ES takes about 6 sec.
I understand the data has to move from Elasticsearch cluster to my spark cluster and some data shuffling in Spark.
The data inside my ES Index is approx. 300 million documents and each document has about 400 fields (1.4Terrabyte).
I've got a 3 node spark cluster(1 master, 2 workers) with 60GB of memory and 8 cores in total.
The time it takes to run is not acceptable, is there a way to make my spark job run faster ?
Here is my spark configuration:
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
.setMaster("spark://10.0.0.203:7077")
.set("es.nodes", "10.0.0.207")
.set("es.cluster", "wp-es-reporting-prod")
.setJars(JavaSparkContext.jarOfClass(Demo.class))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.default.parallelism", String.valueOf(cpus * 2))
.set("spark.executor.memory", "8g");
Edited
SparkContext sparkCtx = new SparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkCtx);
DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "customer-rpts01-201510/sample");
DataFrame dfCleaned = cleanSchema(sqlContext, df);
dfCleaned.registerTempTable("RPT");
DataFrame sqlDFTest = sqlContext.sql("SELECT agent, count(request_type) FROM RPT group by agent");
for (Row row : sqlDFTest.collect()) {
System.out.println(">> " + row);
}
I figured out what was going on, basically, I was trying to manipulate the dataframe schema because I have some fields with a dot e.g user.firstname. This seems to cause a problem in the collect phase of spark. To resolve this, I had to just re-index my data so my fields no longer have dot but an underscore e.g user_firstname.
I'm afraid you can't perform a group by over 1.4 TB with only 120 GB of total RAM and achieve good performance. DF will try to load all data in memory/disk and only then it will perform group by. I don't think that at the moment spark/ES connector translates sql syntax in ES query language.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With