I have setup Spark 2.0 and Cassandra 3.0 on a local machine (8 cores, 16gb ram) for testing purposes and edited spark-defaults.conf
as follows:
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
Next I imported 1.5 million rows in Cassandra:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev
is a list containing numeric values i.e. [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
Now in the code, to test the whole thing I just created a SparkSession
, connected to Cassandra and make a simple select count:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
At this point, Spark outputs the count
and takes about 28 seconds to finish the Job
, distributed in 13 Tasks
(in Spark UI
, the total Input for the Tasks is 331.6MB)
Questions:
spark.sql.shuffle.partitions
to 4, why is creating 13 Tasks? (Also made sure the number of partitions calling rdd.getNumPartitions()
on my DataFrame)Update
A common operation I would like to test over this data:
pid
ev
, a list<double>
df.groupBy('pid').agg(avg(df['ev'][1]))
As @zero323 suggested, I deployed a external machine (2Gb RAM, 4 cores, SSD) with Cassandra just for this test, and loaded the same data set. The result of the df.select().count()
was an expected greater latency and overall poorer performance in comparison with my previous test (took about 70 seconds to finish the Job
).
Edit: I misunderstood his suggestion. @zero323 meant to let Cassandra perform the count instead of using Spark SQL, as explained in here
Also I wanted to point out that I am aware of the inherent anti-pattern of setting a list<double>
instead a wide row for this type of data, but my concerns at this moment are more the time spent on retrieval of a large dataset rather than the actual average computation time.
Start cassandra process in the foreground. Launch Cassandra query shell. cqlsh:lab> create keyspace lab with replication = {'class': 'SimpleStrategy', 'replication_factor': 1} ; Create table for movies data.
Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.
Is that the expected performance? If not, what am I missing?
It looks slowish but it is not exactly unexpected. In general count
is expressed as
SELECT 1 FROM table
followed by Spark side summation. So while it is optimized it still rather inefficient because you have fetch N long integers from the external source just to sum these locally.
As explained by the docs Cassandra backed RDD (not Datasets
) provide optimized cassandraCount
method which performs server side counting.
Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the
spark.sql.shuffle.partitions
to (...), why is creating (...) Tasks?
Because spark.sql.shuffle.partitions
is not used here. This property is used to determine number of partitions for shuffles (when data is aggregated by some set of keys) not for Dataset
creation or global aggregations like count(*)
(which always use 1 partition for final aggregation).
If you interested in controlling number of initial partitions you should take a look at spark.cassandra.input.split.size_in_mb
which defines:
Approx amount of data to be fetched into a Spark partition. Minimum number of resulting Spark partitions is 1 + 2 * SparkContext.defaultParallelism
As you can see another factor here is spark.default.parallelism
but it is not exactly a subtle configuration so depending on it in general is not an optimal choice.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With