I am reading data from cassandra partition to spark using cassandra-connector.I tried below solutions for reading partitions.I tried to parallelize the task by creating rdds as much as possible but both solution ONE and solution TWO had same performance .
In solution ONE , I could see the stages in spark UI immediately. I tried to avoid one for loop in solution TWO.
In solution TWO, the stages appear after a considerable amount of time.Also as the number of userids increases then there is significant increase in time before the stages appear in spark UI for solution TWO.
Version
spark - 1.1
Dse - 4.6
cassandra-connector -1.1
Setup
3 - Nodes with spark cassandra
Each node has 1 core dedicated to this task.
512MB ram for the executor memory.
My cassandra Table schema,
CREATE TABLE test (
user text,
userid bigint,
period timestamp,
ip text,
data blob,
PRIMARY KEY((user,userid,period),ip)
);
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val partitions = users.flatMap(x => period.map(y => (x,y))))
val userids = 1 to 10
for (userid <- userids){
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,userid)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
}
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val userids = 1 to 10
val partitions = users.flatMap(x => period.flatMap(
y => userids.map(z => (x,y,z))))
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,x._3)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
Why the solution TWO is not faster than solution ONE ?
My Understanding is that since all the partitions are queried at one stretch and data is distributed across nodes it should be faster. Please correct me if I am wrong.
One important way to increase parallelism of spark processing is to increase the number of executors on the cluster. However, knowing how the data should be distributed, so that the cluster can process data efficiently is extremely important. The secret to achieve this is partitioning in Spark.
Regardless where you run your workloads, you have two approaches that you can use to integrate Spark and Cassandra. You can have a cluster for each tool or runt them in the same cluster which is the main focus of this article.
The fundamental idea is quite simple: Spark and Cassandra clusters are deployed to the same set of machines. Cassandra stores the data; Spark worker nodes are co-located with Cassandra and do the data processing. Spark is a batch-processing system, designed to deal with large amounts of data.
To start with you should look into joinWithCassandraTable which should be an easier api for what you are doing (granted you have enough partitions to make it worth while). This api takes an RDD of partition keys and palatalizes and distributes their retrieval from C*.
This would let you do something like
sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")
If you wanted you could also do a repartitionByCassandraReplica
but this would most likely not be beneficial for very small requests. You'll have to benchmark with your data to know for sure.
If you'd like to just do raw driver commands you can do something like
val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it =>
cc.withSessionDo{ session =>
session.execute( Some query )
}
}
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12
Now lets go into a quick walk-though of your code samples First of all we are only retrieving 60 C* partitions. for this use case we will most likely be dominated by the time it takes to setup and take down tasks compared to the time it takes to retrieve the partitions from C*.
In both solutions you are basically doing the same thing because of the lazy evaluation that Spark does. The driver creates a graph which starts with creating 60 RDD's each with a lazy instruction to retreive a single partition from C*. (1 RDD per partition is bad, RDD's are meant to store large amounts of data so this ends up being quite a lot of overhead). Even though the 60 RDD's are made with different patterns this really doesn't matter because their actual evaluation won't happen until you call collect. The driver continues to set up new RDD's and transformations.
Until we hit collect absolutely nothing is done to retrieve data from C* and since we hit collect with basically the same dependency graph for both solutions you posted above the exact (or very similar) thing will happen in both cases. All 60 RDD's will be resolved as the dependency graph specifies. This will happen in parallel but again will be extremely overhead intensive.
To avoid this check out my example above of using a single RDD to pull all of the information.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With