I am running spark 1.6 on 3 VMs (i.e. 1x master; 2x slaves) all with 4 cores and 16GB RAM.
I can see the workers registered on spark-master webUI.
I want to retrieve data from my Vertica database to work on it. As I didn't manage to run complex queries I tried dummy queries to understand. We consider here an easy task.
My code is:
df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)
And the output is (note: I replace with @IPSLAVE
the slave VM IP:Port):
16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s
As you can see it take a reaaaaaally long time. My table is actually quite big (stores around 220 millions lines, 11 fields each) but such a query would be quite instantly executed using "normal" sql (e.g. pyodbc).
I guess I am missunderstanding/missusing Spark, would you have so ideas or advice to make it work better?
While Spark supports a limited predicate pushdown over JDBC all other operations, like limit, group, aggregations are performed internally. Unfortunately it means that take(4)
will fetch data first and then apply the limit
. In other words your database will execute (assuming no projections an filters) something equivalent to:
SELECT * FROM table
and the rest will handled by Spark. There are some optimizations involved (in particular Spark evaluates partitions iteratively to obtain number of records requested by LIMIT
) but it still quite inefficient process compared to database-side optimizations.
If you want to push limit
to the database you'll have to do it statically using subquery as a dbtable
parameter:
(sqlContext.read.format('jdbc')
.options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
"url" -> "xxxx",
"dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))
Please note that an alias in subquery is mandatory.
Note:
This behavior may be improved in the future, once Data Source API v2 is ready:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With