Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark JoinWithCassandraTable on TimeStamp partition key STUCK

I'm trying to filter on a small part of a huge C* table by using:

    val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tspark")

    println("Done Join")
    //*******
    //get only the snapshots and create rdd temp table
    val jsons = snapshotsFiltered.map(_._2.getString("snapshot"))
    val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
    jsonSchemaRDD.registerTempTable("snapshots_json")

With:

    case class TableKey(created: Long) //(created, imei, when)--> created = partititon key | imei, when = clustering key

And the cassandra table schema is:

CREATE TABLE listener.snapshots_tspark (
created timestamp,
imei text,
when timestamp,
snapshot text,
PRIMARY KEY (created, imei, when) ) WITH CLUSTERING ORDER BY (imei ASC, when ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

The problem is that the process freezes after the println done with no errors on spark master ui.

[Stage 0:>                                                                                                                                (0 + 2) / 2]

Won`t the Join work with timestamp as the partition key? Why it freezes?

like image 543
Reshef Avatar asked Oct 25 '15 12:10

Reshef


People also ask

How does partitioning work in spark?

Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions. Partitioning is an expensive operation as it creates a data shuffle (Data could move between the nodes) By default, DataFrame shuffle operations create 200 partitions.

Does reduceByKey change the number of partitions in spark?

Both getNumPartitions from the above examples return the same number of partitions. Though reduceByKey () triggers data shuffle, it doesn’t change the partition count as RDD’s inherit the partition size from parent RDD. You may get partition counts different based on your setup and how Spark creates partitions.

How does spark store the results of the Map tasks?

The results of the map tasks are kept in memory. When results do not fit in memory, Spark stores the data on a disk. Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate. Finally runs reduce tasks on each partition based on key.

How do I get the data locality of a spark partition?

It is recommended that you call repartitionByCassandraReplica before JoinWithCassandraTable to obtain data locality, such that each spark partition will only require queries to their local node. Note that these methods are used under the hood by the connector when you use the data set or data frames API.


1 Answers

By using:

sc.parallelize(startDate to endDate)

With the startData and endDate as Longs generated from Dates by the format:

("yyyy-MM-dd HH:mm:ss")

I made spark to build a huge array (100,000+ objects) to join with C* table and it didn't stuck at all- C* worked hard to make the join happen and return the data.

Finally, I changed my range to:

case class TableKey(created_dh: String)
val data = Array("2015-10-29 12:00:00", "2015-10-29 13:00:00", "2015-10-29 14:00:00", "2015-10-29 15:00:00")
val snapshotsFiltered = sc.parallelize(data, 2).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tnew")

And it is ok now.

like image 163
Reshef Avatar answered Sep 22 '22 16:09

Reshef