I'm just starting using Spark SQL + Cassandra, and probably am missing something important, but one simple query takes ~45 seconds. I'm using cassanda-spark-connector
library, and run the local web server which also hosts the Spark. So my setup is roughly like this:
In sbt:
"org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
"org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j"))
In code I have a singleton that hosts SparkContext
and CassandraSQLContetx
. It's then called from the servlet. Here's how the singleton code looks like:
object SparkModel {
val conf =
new SparkConf()
.setAppName("core")
.setMaster("local")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
val sqlC = new CassandraSQLContext(sc)
sqlC.setKeyspace("core")
val df: DataFrame = sqlC.cassandraSql(
"SELECT email, target_entity_id, target_entity_type " +
"FROM tracking_events " +
"LEFT JOIN customers " +
"WHERE entity_type = 'User' AND entity_id = customer_id")
}
And here how I use it:
get("/spark") {
SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList
}
Cassandra, Spark and the web app run on the same host in virtual machine on my Macbook Pro with decent specs. Cassandra queries by themselves take 10-20 milliseconds.
When I call this endpoint for the first time, it takes 70-80 seconds to return the result. Subsequent queries take ~45 seconds. The log of the subsequent operation looks like this:
12:48:50 INFO org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false)
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146)
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Parents of final stage: List()
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Missing parents: List()
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB)
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB)
12:48:50 INFO o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB)
12:48:50 INFO org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146)
12:48:50 INFO o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks
12:48:50 INFO o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes)
12:48:50 INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1)
12:48:50 INFO com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
12:48:50 INFO c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
12:49:35 INFO o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1)
12:49:35 INFO o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool
12:49:35 INFO o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s
As you can see from the log, the longest pauses are between these 3 lines (21 + 24 seconds):
12:48:50 INFO c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
Apparently, I'm doing something wrong. What's that? How can I improve this?
EDIT: Important addition: the size of the tables is tiny (~200 entries for tracking_events
, ~20 for customers
), so reading them in their whole into memory shouldn't take any significant time. And it's a local Cassandra installation, no cluster, no networking is involved.
"SELECT email, target_entity_id, target_entity_type " +
"FROM tracking_events " +
"LEFT JOIN customers " +
"WHERE entity_type = 'User' AND entity_id = customer_id")
This query will read all of the data from both the tracking_events and customers table. I would compare the performance to just doing a SELECT COUNT(*) on both tables. If it is significantly different then there may be an issue but my guess is this is just the amount of time it takes to read both tables entirely into memory.
There are a few knobs for tuning how reads are done and since the defaults are oriented towards a much a bigger dataset you may want to change these.
spark.cassandra.input.split.size_in_mb approx amount of data to be fetched into a Spark partition 64 MB
spark.cassandra.input.fetch.size_in_rows number of CQL rows fetched per driver request 1000
I would make sure you are generating as many tasks as you have cores (at the minimum) so you can take advantage of all of your resources. To do this shrink the input.split.size
The fetch size controls how many rows are paged at a time by an executor core so increasing this can increase speed in some use cases.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With