I'm running a simplistic application on Spark/Cassandra cluster. Since moving to a new environment (Spark 1.5 instead of 1.2 and minor Cassandra version upgrade too) substantial performance downgrade was observed (from 4 s. to 1-5 m. for same task and same amounts of data).
After initial investigation it seems, that for exactly same code from spark-driver's perspective, there are many more tasks generated (20+k, where it used to be up to 5) and logs on executor's end also reflect the same situation:
many sequential executions of the same query on different partitions:
...
CassandraTableScanRDD: Fetched 0 rows from x.y for partition 20324 in 0.138 s.
CassandraTableScanRDD: Fetched 0 rows from x.y for partition 20327 in 0.058 s.
CassandraTableScanRDD: Fetched 0 rows from x.y for partition 20329 in 0.053 s.
...
where it used to be a single one:
CassandraTableScanRDD: Fetched 905 rows from x.y for partition 0 in 2.992 s.
Since application code is the same, I wonder what could possibly have caused such a difference in partitioning behavior and what can be done to remediate that?
NB! Setup of both environments if different, configuration is not shared/inherited.
Thanks.
The new version of the Spark Cassandra Connector uses a System table inside of more modern Cassandra to estimate split size. This table is updated every (5 minutes currently) although the number of splits you are seeing is extremely large. The value read out of this table is divided by your split size.
If you are using C* less than 2.1.5 this table does not exist and the partitioning will need to be done manually.
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md#what-does-inputsplitsize_in_mb-use-to-determine-size
You can manually pass in the number of splits via the ReadConf if you are continuing to see issues.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With