I'm trying to filter on a small part of a huge C* table by using:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tspark")
println("Done Join")
//*******
//get only the snapshots and create rdd temp table
val jsons = snapshotsFiltered.map(_._2.getString("snapshot"))
val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
jsonSchemaRDD.registerTempTable("snapshots_json")
With:
case class TableKey(created: Long) //(created, imei, when)--> created = partititon key | imei, when = clustering key
And the cassandra table schema is:
CREATE TABLE listener.snapshots_tspark (
created timestamp,
imei text,
when timestamp,
snapshot text,
PRIMARY KEY (created, imei, when) ) WITH CLUSTERING ORDER BY (imei ASC, when ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
The problem is that the process freezes after the println done with no errors on spark master ui.
[Stage 0:> (0 + 2) / 2]
Won`t the Join work with timestamp as the partition key? Why it freezes?
Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions. Partitioning is an expensive operation as it creates a data shuffle (Data could move between the nodes) By default, DataFrame shuffle operations create 200 partitions.
Both getNumPartitions from the above examples return the same number of partitions. Though reduceByKey () triggers data shuffle, it doesn’t change the partition count as RDD’s inherit the partition size from parent RDD. You may get partition counts different based on your setup and how Spark creates partitions.
The results of the map tasks are kept in memory. When results do not fit in memory, Spark stores the data on a disk. Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate. Finally runs reduce tasks on each partition based on key.
It is recommended that you call repartitionByCassandraReplica before JoinWithCassandraTable to obtain data locality, such that each spark partition will only require queries to their local node. Note that these methods are used under the hood by the connector when you use the data set or data frames API.
By using:
sc.parallelize(startDate to endDate)
With the startData and endDate as Longs generated from Dates by the format:
("yyyy-MM-dd HH:mm:ss")
I made spark to build a huge array (100,000+ objects) to join with C* table and it didn't stuck at all- C* worked hard to make the join happen and return the data.
Finally, I changed my range to:
case class TableKey(created_dh: String)
val data = Array("2015-10-29 12:00:00", "2015-10-29 13:00:00", "2015-10-29 14:00:00", "2015-10-29 15:00:00")
val snapshotsFiltered = sc.parallelize(data, 2).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tnew")
And it is ok now.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With