Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pushdown limit predicate for Cassandra when you use dataframes?

I have large Cassandra table. I want to load only 50 rows from Cassandra. Following code

val ds = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))
      .load()
      .where(col("aggregate_type") === "DAY")
      .where(col("start_time") <= "2018-03-28")
      .limit(50).collect()

Following code pushes both predicates from where methods, but not limit one. Is it true that whole data (1 million of records) being fetched? If not, why run time of this code and code without limit(50) roughly the same.

like image 700
addmeaning Avatar asked Mar 28 '18 12:03

addmeaning


1 Answers

Unlike Spark Streaming, Spark itself is trying to preload as much data as it can, as fast as it can so to be able operate on it in parallel. So preloading is lazy, but greedy when it's triggered. There are cassandra-conector specific factors however:

  • Automatic predicate pushdown of valid "where" clauses.

  • According to this answer limit(...) is not translated to CQL's LIMIT, so then its behavior depends on how many fetching jobs are created after enough data is downloaded. Quote:

calling limit will allow Spark to skip reading some portions from the underlying DataSource. These would limit the amount of data read from Cassandra by canceling tasks from being executed.

Possible solutions:

  • DataFrame limits could be partially managed by limiting numPartitions and data exchange rate (concurrent.reads and other params). If you're okay with n ~ 50 "in most cases", you could also limit something like where(dayIndex < 50 * factor * num_records).

  • There is a way to set CQL LIMIT through SparkPartitionLimit, which is directly affecting every CQL request (see more) - keep in mind that requests are per-spark-partition. It's available in CassandraRdd extension class, so you would have to convert to RDD first.

The code would be something like:

filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()

This would append LIMIT $N to every CQL-request. Unlike with DataFrame's limit, if you specify CassandraRDD limit several times (.limit(10).limit(20)) - only last one is going to be appended. Also, I used n instead of n / numPartitions + 1 as it (even if Spark and Cassandra partitions are one-to-one) might return less results per-partition. As a result, I had to add take(n) in order to cut <= numPartitions * n down to n.

Warning double-check that your where's are translatable to CQL (using explain()) - otherwise LIMIT would be applied before filtering.

P.S. You could also try to run CQL directly using sparkSession.sql(...) (like here) and compare results.

like image 138
dk14 Avatar answered Nov 10 '22 19:11

dk14