Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Cassandra Connector keyBy and shuffling

I am trying to optimize my spark job by avoiding shuffling as much as possible.

I am using cassandraTable to create the RDD.

The column family's column names are dynamic, thus it is defined as follows:

CREATE TABLE "Profile" (
  key text,
  column1 text,
  value blob,
  PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
  bloom_filter_fp_chance=0.010000 AND
  caching='ALL' AND
  ...

This definition results in CassandraRow RDD elements in the following format:

CassandraRow <key, column1, value>
  • key - the RowKey
  • column1 - the value of column1 is the name of the dynamic column
  • value - the value of the dynamic column

So if I have RK='profile1', with columns name='George' and age='34', the resulting RDD will be:

CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>

Then I need to group elements that share the same key together to get a PairRdd:

PairRdd<String, Iterable<CassandraRow>>

Important to say, that all the elements I need to group are in the same Cassandra node (share the same row key), so I expect the connector to keep the locality of the data.

The problem is that using groupBy or groupByKey causes shuffling. I rather group them locally, because all the data is on the same node:

JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
        .cassandraTable(ks, "Profile")
        .groupBy(new Function<ColumnFamilyModel, String>() {
            @Override
            public String call(ColumnFamilyModel arg0) throws Exception {
                return arg0.getKey();
            }
        })

My questions are:

  1. Does using keyBy on the RDD will cause shuffling, or will it keep the data locally?
  2. Is there a way to group the elements by key without shuffling? I read about mapPartitions, but didn't quite understand the usage of it.

Thanks,

Shai

like image 804
Shai Avatar asked Mar 11 '15 09:03

Shai


People also ask

How do I Connect Spark to a Cassandra cluster?

To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that. This should output compiled jar files to the directory named “target”.

How to create Spark partitions aligned to a Cassandra partition?

The good news is that in many cases the Cassandra connector will take care of this for you automatically. When you use the Cassandra Spark connector’s, it will automatically create Spark partitions aligned to the Cassandra partition key !.

How to improve data locality in spark and Cassandra?

You can always use spark repartition () method before writing to Cassandra to achieve data locality but this is slow and overkill since the Spark Cassandra Connector already does this under the hood much more efficiently. The Connector automatically batches the data for your in an optimal way.

Can spark compete with Cassandra for I/O?

It will compete with Cassandra for I/O. Spark HDFS writes are quite heavy I/O operations and they will slow down and starve your Cassandra cluster. The rest of the article will focus mainly on running Spark with Cassandra in the same cluster although many of the optimizations also apply if you run them in different clusters.


1 Answers

I think you are looking for spanByKey, a cassandra-connector specific operation that takes advantage of the ordering provided by cassandra to allow grouping of elements without incurring in a shuffle stage.

In your case, it should look like:

sc.cassandraTable("keyspace", "Profile")
  .keyBy(row => (row.getString("key")))
  .spanByKey

Read more in the docs:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

like image 153
maasg Avatar answered Sep 24 '22 06:09

maasg