I am trying to optimize my spark job by avoiding shuffling as much as possible.
I am using cassandraTable to create the RDD.
The column family's column names are dynamic, thus it is defined as follows:
CREATE TABLE "Profile" (
key text,
column1 text,
value blob,
PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='ALL' AND
...
This definition results in CassandraRow RDD elements in the following format:
CassandraRow <key, column1, value>
So if I have RK='profile1', with columns name='George' and age='34', the resulting RDD will be:
CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>
Then I need to group elements that share the same key together to get a PairRdd:
PairRdd<String, Iterable<CassandraRow>>
Important to say, that all the elements I need to group are in the same Cassandra node (share the same row key), so I expect the connector to keep the locality of the data.
The problem is that using groupBy or groupByKey causes shuffling. I rather group them locally, because all the data is on the same node:
JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
.cassandraTable(ks, "Profile")
.groupBy(new Function<ColumnFamilyModel, String>() {
@Override
public String call(ColumnFamilyModel arg0) throws Exception {
return arg0.getKey();
}
})
My questions are:
Thanks,
Shai
To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that. This should output compiled jar files to the directory named “target”.
The good news is that in many cases the Cassandra connector will take care of this for you automatically. When you use the Cassandra Spark connector’s, it will automatically create Spark partitions aligned to the Cassandra partition key !.
You can always use spark repartition () method before writing to Cassandra to achieve data locality but this is slow and overkill since the Spark Cassandra Connector already does this under the hood much more efficiently. The Connector automatically batches the data for your in an optimal way.
It will compete with Cassandra for I/O. Spark HDFS writes are quite heavy I/O operations and they will slow down and starve your Cassandra cluster. The rest of the article will focus mainly on running Spark with Cassandra in the same cluster although many of the optimizations also apply if you run them in different clusters.
I think you are looking for spanByKey
, a cassandra-connector specific operation that takes advantage of the ordering provided by cassandra to allow grouping of elements without incurring in a shuffle stage.
In your case, it should look like:
sc.cassandraTable("keyspace", "Profile")
.keyBy(row => (row.getString("key")))
.spanByKey
Read more in the docs:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With