Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Cassandra connector - Range query on partition key

I'm evaluating spark-cassandra-connector and i'm struggling trying to get a range query on partition key to work.

According to the connector's documentation it seems that's possible to make server-side filtering on partition key using equality or IN operator, but unfortunately, my partition key is a timestamp, so I can not use it.

So I tried using Spark SQL with the following query ('timestamp' is the partition key):

select * from datastore.data where timestamp >= '2013-01-01T00:00:00.000Z' and timestamp < '2013-12-31T00:00:00.000Z'

Although the job spawns 200 tasks, the query is not returning any data.

Also I can assure that there is data to be returned since running the query on cqlsh (doing the appropriate conversion using 'token' function) DOES return data.

I'm using spark 1.1.0 with standalone mode. Cassandra is 2.1.2 and connector version is 'b1.1' branch. Cassandra driver is DataStax 'master' branch. Cassandra cluster is overlaid on spark cluster with 3 servers with replication factor of 1.

Here is the job's full log

Any clue anyone?

Update: When trying to do server-side filtering based on the partition key (using CassandraRDD.where method) I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Range predicates on partition key columns (here: timestamp) are not supported in where. Use filter instead.

But unfortunately I don't know what "filter" is...

like image 548
thiago Avatar asked Nov 19 '14 23:11

thiago


People also ask

How to create Spark partitions aligned to a Cassandra partition?

The good news is that in many cases the Cassandra connector will take care of this for you automatically. When you use the Cassandra Spark connector’s, it will automatically create Spark partitions aligned to the Cassandra partition key !.

What is batch grouping in spark Cassandra?

“ spark.cassandra.output.batch.grouping.key ”: Cassandra connector will group the batches based on this value, the default is partition so it will try to re partition so each batch goes to the same partition and in turn same node. You can set it to replica_set.

How to pull more data into spark from Cassandra?

When reading data, the connector will size partitions based on the estimate of the Spark data size, you can increase “ spark.cassandra.input.split.sizeInMB ” if you want to pull more data into Spark, however be careful not to hold too much data or you will run into issues.

What are clustering keys in Cassandra?

Clustering keys are things we add to the primary key. That gives the order to that partition of rows. And in this case, we’re sorting them by release year. Partition key, clustering key, together they make up the primary key and that is, if you will, a key part of table design in Cassandra. Cassandra uses the first column name as the partition key.


2 Answers

You have several options to get the solution you are looking for.

The most powerful one would be to use Lucene indexes integrated with Cassandra by Stratio, which allows you to search by any indexed field in the server side. Your writing time will be increased but, on the other hand, you will be able to query any time range. You can find further information about Lucene indexes in Cassandra here. This extended version of Cassandra is fully integrated into the deep-spark project so you can take all the advantages of the Lucene indexes in Cassandra through it. I would recommend you to use Lucene indexes when you are executing a restricted query that retrieves a small-medium result set, if you are going to retrieve a big piece of your data set, you should use the third option underneath.

Another approach, depending on how your application works, might be to truncate your timestamp field so you can look for it using an IN operator. The problem is, as far as I know, you can't use the spark-cassandra-connector for that, you should use the direct Cassandra driver which is not integrated with Spark, or you can have a look at the deep-spark project where a new feature allowing this is about to be released very soon. Your query would look something like this:

select * from datastore.data where timestamp IN ('2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', ... , '2013-12-31')

, but, as I said before, I don't know if it fits to your needs since you might not be able to truncate your data and group it by date/time.

The last option you have, but the less efficient, is to bring the full data set to your spark cluster and apply a filter on the RDD.

Disclaimer: I work for Stratio :-) Don't hesitate on contacting us if you need any help.

I hope it helps!

like image 104
opuertas Avatar answered Jan 04 '23 03:01

opuertas


i think the CassandraRDD error is telling that the query that you are trying to do is not allowed in Cassandra and you have to load all the table in a CassandraRDD and then make a spark filter operation over this CassandraRDD.

So your code (in scala) should something like this:

val cassRDD= sc.cassandraTable("keyspace name", "table name").filter(row=> row.getDate("timestamp")>=DateFormat('2013-01-01T00:00:00.000Z')&&row.getDate("timestamp") < DateFormat('2013-12-31T00:00:00.000Z'))

If you are interested in making this type of queries you might have to take a look to others Cassandra connectors, like the one developed by Stratio

like image 29
jlopezmat Avatar answered Jan 04 '23 04:01

jlopezmat