Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get good performance on reading cassandra partitions in spark?

I am reading data from cassandra partition to spark using cassandra-connector.I tried below solutions for reading partitions.I tried to parallelize the task by creating rdds as much as possible but both solution ONE and solution TWO had same performance .

In solution ONE , I could see the stages in spark UI immediately. I tried to avoid one for loop in solution TWO.

In solution TWO, the stages appear after a considerable amount of time.Also as the number of userids increases then there is significant increase in time before the stages appear in spark UI for solution TWO.

Version
 spark - 1.1
 Dse - 4.6
 cassandra-connector -1.1

Setup
 3 - Nodes with spark cassandra
 Each node has 1 core dedicated to this task.
 512MB ram for the executor memory.

My cassandra Table schema,

 CREATE TABLE   test (
   user text,
   userid bigint,
   period timestamp,
   ip text,
   data blob,
   PRIMARY KEY((user,userid,period),ip)
   );

First solution:

 val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val partitions = users.flatMap(x => period.map(y => (x,y))))
 val userids = 1 to 10
 for (userid <- userids){
 val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
                                .select("data")
                                .where("user=?", x._1)
                                .where("period=?",x._2)
                                .where("userid=?,userid)
                          )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)
 }

Second Solution:

 val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val userids = 1 to 10
 val partitions = users.flatMap(x => period.flatMap(
                  y => userids.map(z => (x,y,z))))

 val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
                                .select("data")
                                .where("user=?", x._1)
                                .where("period=?",x._2)
                                .where("userid=?,x._3)
                     )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)

Why the solution TWO is not faster than solution ONE ?

My Understanding is that since all the partitions are queried at one stretch and data is distributed across nodes it should be faster. Please correct me if I am wrong.

like image 275
Knight71 Avatar asked Jun 06 '15 19:06

Knight71


People also ask

How do you increase parallelism in Spark?

One important way to increase parallelism of spark processing is to increase the number of executors on the cluster. However, knowing how the data should be distributed, so that the cluster can process data efficiently is extremely important. The secret to achieve this is partitioning in Spark.

Can Cassandra and Spark run on the same cluster?

Regardless where you run your workloads, you have two approaches that you can use to integrate Spark and Cassandra. You can have a cluster for each tool or runt them in the same cluster which is the main focus of this article.

How does Spark work with Cassandra?

The fundamental idea is quite simple: Spark and Cassandra clusters are deployed to the same set of machines. Cassandra stores the data; Spark worker nodes are co-located with Cassandra and do the data processing. Spark is a batch-processing system, designed to deal with large amounts of data.


1 Answers

To start with you should look into joinWithCassandraTable which should be an easier api for what you are doing (granted you have enough partitions to make it worth while). This api takes an RDD of partition keys and palatalizes and distributes their retrieval from C*.

This would let you do something like

sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")

If you wanted you could also do a repartitionByCassandraReplica but this would most likely not be beneficial for very small requests. You'll have to benchmark with your data to know for sure.

If you'd like to just do raw driver commands you can do something like

val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it => 
  cc.withSessionDo{ session =>
    session.execute( Some query )
  }
}

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

Code sample walkthroughs:

Now lets go into a quick walk-though of your code samples First of all we are only retrieving 60 C* partitions. for this use case we will most likely be dominated by the time it takes to setup and take down tasks compared to the time it takes to retrieve the partitions from C*.

In both solutions you are basically doing the same thing because of the lazy evaluation that Spark does. The driver creates a graph which starts with creating 60 RDD's each with a lazy instruction to retreive a single partition from C*. (1 RDD per partition is bad, RDD's are meant to store large amounts of data so this ends up being quite a lot of overhead). Even though the 60 RDD's are made with different patterns this really doesn't matter because their actual evaluation won't happen until you call collect. The driver continues to set up new RDD's and transformations.

Until we hit collect absolutely nothing is done to retrieve data from C* and since we hit collect with basically the same dependency graph for both solutions you posted above the exact (or very similar) thing will happen in both cases. All 60 RDD's will be resolved as the dependency graph specifies. This will happen in parallel but again will be extremely overhead intensive.

To avoid this check out my example above of using a single RDD to pull all of the information.

like image 113
RussS Avatar answered Oct 11 '22 16:10

RussS