Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: PySpark + Cassandra query performance

I have setup Spark 2.0 and Cassandra 3.0 on a local machine (8 cores, 16gb ram) for testing purposes and edited spark-defaults.conf as follows:

spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4

Next I imported 1.5 million rows in Cassandra:

test(
    tid int,
    cid int,
    pid int,
    ev list<double>,
    primary key (tid)
)

test.ev is a list containing numeric values i.e. [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]

Now in the code, to test the whole thing I just created a SparkSession, connected to Cassandra and make a simple select count:

cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()

At this point, Spark outputs the count and takes about 28 seconds to finish the Job, distributed in 13 Tasks (in Spark UI, the total Input for the Tasks is 331.6MB)

Questions:

  • Is that the expected performance? If not, what am I missing?
  • Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the spark.sql.shuffle.partitions to 4, why is creating 13 Tasks? (Also made sure the number of partitions calling rdd.getNumPartitions() on my DataFrame)

Update

A common operation I would like to test over this data:

  • Query a large data set, say, from 100,000 ~ N rows grouped by pid
  • Select ev, a list<double>
  • Perform an average on each member, assuming by now each list has the same length i.e df.groupBy('pid').agg(avg(df['ev'][1]))

As @zero323 suggested, I deployed a external machine (2Gb RAM, 4 cores, SSD) with Cassandra just for this test, and loaded the same data set. The result of the df.select().count() was an expected greater latency and overall poorer performance in comparison with my previous test (took about 70 seconds to finish the Job).

Edit: I misunderstood his suggestion. @zero323 meant to let Cassandra perform the count instead of using Spark SQL, as explained in here

Also I wanted to point out that I am aware of the inherent anti-pattern of setting a list<double> instead a wide row for this type of data, but my concerns at this moment are more the time spent on retrieval of a large dataset rather than the actual average computation time.

like image 785
TMichel Avatar asked Sep 19 '16 15:09

TMichel


People also ask

How do I query Cassandra with Spark?

Start cassandra process in the foreground. Launch Cassandra query shell. cqlsh:lab> create keyspace lab with replication = {'class': 'SimpleStrategy', 'replication_factor': 1} ; Create table for movies data.

Why is PySpark so slow?

Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.


1 Answers

Is that the expected performance? If not, what am I missing?

It looks slowish but it is not exactly unexpected. In general count is expressed as

SELECT 1 FROM table

followed by Spark side summation. So while it is optimized it still rather inefficient because you have fetch N long integers from the external source just to sum these locally.

As explained by the docs Cassandra backed RDD (not Datasets) provide optimized cassandraCount method which performs server side counting.

Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the spark.sql.shuffle.partitions to (...), why is creating (...) Tasks?

Because spark.sql.shuffle.partitions is not used here. This property is used to determine number of partitions for shuffles (when data is aggregated by some set of keys) not for Dataset creation or global aggregations like count(*) (which always use 1 partition for final aggregation).

If you interested in controlling number of initial partitions you should take a look at spark.cassandra.input.split.size_in_mb which defines:

Approx amount of data to be fetched into a Spark partition. Minimum number of resulting Spark partitions is 1 + 2 * SparkContext.defaultParallelism

As you can see another factor here is spark.default.parallelism but it is not exactly a subtle configuration so depending on it in general is not an optimal choice.

like image 99
zero323 Avatar answered Nov 15 '22 11:11

zero323