Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cassandra insert performance using spark-cassandra connector

I am a newbie to spark and cassandra. I am trying to insert into cassandra table using spark-cassandra connector as below:

import java.util.UUID

import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._

case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)

object SparkConnectorContext {
  val conf = new SparkConf(true).setMaster("local")
    .set("spark.cassandra.connection.host", "192.168.xxx.xxx")
  val sc = new SparkContext(conf)
}
object TestRepo {
  def insertList(list: List[TestEntity]) = {
    SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
  }
}
object TestApp extends App {
  val start = System.currentTimeMillis()
  TestRepo.insertList(Utility.generateRandomData())
  val end = System.currentTimeMillis()
  val timeDiff = end-start
  println("Difference (in millis)= "+timeDiff)
}

When I insert using the above method (list with 100 entities), it takes 300-1100 milliseconds. I tried the same data to insert using phantom library. It is only taking less than 20-40 milliseconds.

Can anyone tell me why spark connector is taking this much time for insert? Am I doing anything wrong in my code or is it not advisable to use spark-cassandra connector for insert operations?

like image 687
Yadu Krishnan Avatar asked Mar 16 '23 03:03

Yadu Krishnan


2 Answers

It looks like you are including the parallelize operation in your timing. Also since you have your spark worker running on a different machine than Cassandra, the saveToCassandra operation will be a write over the network.

Try configuring your system to run the spark workers on the Cassandra nodes. Then create an RDD in a separate step and invoke an action like count() on it to load the data into memory. Also you might want to persist() or cache() the RDD to make sure it stays in memory for the test.

Then time just the saveToCassandra of that cached RDD.

You might also want to look at the repartitionByCassandraReplica method offered by the Cassandra connector. That would partition the data in the RDD based on which Cassandra node the writes need to go to. In that way you exploit data locality and often avoid doing writes and shuffles over the network.

like image 158
Jim Meyer Avatar answered Mar 17 '23 17:03

Jim Meyer


There are some serious problems with your "benchmark":

  1. Your data set is so small that you're measuring mostly only the job setup time. Saving 100 entities should be of order of single milliseconds on a single node, not seconds. Also saving 100 entities gives JVM no chance to compile the code you run to optimized machine code.
  2. You included spark context initialization in your measurement. JVM loads classes lazily, so the code for spark initialization is really called after the measurement is started. This is an extremely costly element, typically performed only once per whole spark application, not even per job.
  3. You're performing the measurement only once per launch. This means you're even incorrectly measuring spark ctx setup and job setup time, because the JVM has to load all the classes for the first time and Hotspot has probably no chance to kick in.

To summarize, you're very likely measuring mostly class loading time, which is dependent on the size and number of classes loaded. Spark is quite a large thing to load and a few hundred milliseconds are not surprising at all.

To measure insert performance correctly:

  • use larger data set
  • exclude one-time setup from the measurement
  • do multiple runs sharing the same spark context and discard a few initial ones, until you reach steady state performance.

BTW If you enable debug logging level, the connector logs the insert times for every partition in the executor logs.

like image 32
Piotr Kołaczkowski Avatar answered Mar 17 '23 18:03

Piotr Kołaczkowski