Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL + Cassandra: bad performance

I'm just starting using Spark SQL + Cassandra, and probably am missing something important, but one simple query takes ~45 seconds. I'm using cassanda-spark-connector library, and run the local web server which also hosts the Spark. So my setup is roughly like this:

In sbt:

    "org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j"))

In code I have a singleton that hosts SparkContext and CassandraSQLContetx. It's then called from the servlet. Here's how the singleton code looks like:

object SparkModel {

  val conf =
    new SparkConf()
      .setAppName("core")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")

  val sc = new SparkContext(conf)
  val sqlC = new CassandraSQLContext(sc)
  sqlC.setKeyspace("core")

  val df: DataFrame = sqlC.cassandraSql(
    "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")
}

And here how I use it:

get("/spark") {
  SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList
}

Cassandra, Spark and the web app run on the same host in virtual machine on my Macbook Pro with decent specs. Cassandra queries by themselves take 10-20 milliseconds.

When I call this endpoint for the first time, it takes 70-80 seconds to return the result. Subsequent queries take ~45 seconds. The log of the subsequent operation looks like this:

12:48:50 INFO  org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Parents of final stage: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Missing parents: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB)
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB)
12:48:50 INFO  o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB)
12:48:50 INFO  org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks
12:48:50 INFO  o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes)
12:48:50 INFO  org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1)
12:48:50 INFO  com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
12:49:35 INFO  o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1)
12:49:35 INFO  o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool 
12:49:35 INFO  o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s

As you can see from the log, the longest pauses are between these 3 lines (21 + 24 seconds):

12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver

Apparently, I'm doing something wrong. What's that? How can I improve this?

EDIT: Important addition: the size of the tables is tiny (~200 entries for tracking_events, ~20 for customers), so reading them in their whole into memory shouldn't take any significant time. And it's a local Cassandra installation, no cluster, no networking is involved.

like image 469
Haspemulator Avatar asked Sep 27 '22 00:09

Haspemulator


1 Answers

  "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")

This query will read all of the data from both the tracking_events and customers table. I would compare the performance to just doing a SELECT COUNT(*) on both tables. If it is significantly different then there may be an issue but my guess is this is just the amount of time it takes to read both tables entirely into memory.

There are a few knobs for tuning how reads are done and since the defaults are oriented towards a much a bigger dataset you may want to change these.

spark.cassandra.input.split.size_in_mb  approx amount of data to be fetched into a Spark partition  64 MB
spark.cassandra.input.fetch.size_in_rows    number of CQL rows fetched per driver request   1000

I would make sure you are generating as many tasks as you have cores (at the minimum) so you can take advantage of all of your resources. To do this shrink the input.split.size

The fetch size controls how many rows are paged at a time by an executor core so increasing this can increase speed in some use cases.

like image 178
RussS Avatar answered Oct 05 '22 02:10

RussS