Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark joinWithCassandraTable() on map multiple partition key ERROR

I'm trying to filter on a small part of a huge Cassandra table by using:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")

I want to map the rows in the cassandra table on 'created' column that is part of the partition key.

My table key (the partition key of the table) defined as:

case class TableKey(imei: String, created: Long, when: Long)

The result is an error:

[error] /home/ubuntu/scala/test/test.scala:61: not enough arguments for method apply: (imei: String, created: Long)test.TableKey in object TableKey. [error] Unspecified value parameter created. [error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") [error] ^ [error] one error found [error] (compile:compile) Compilation failed

It worked with only one object in the partition key as in the Documentation.

Why there is a problem with multiple partition key?- answered.

EDIT: I tried to use the joinWithCassandraTable in the right form:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")

When I trying to run it on Spark there is no errors but it stuck on "[stage 0:> (0+2)/2]" forever...

What goes wrong?

like image 640
Reshef Avatar asked Aug 02 '15 15:08

Reshef


1 Answers

The error is telling you that the class TableKey requires 3 components to initialize yet there was only a single argument passed. This is a Scala compilation error and isn't related to C* or Spark.

 val snapshotsFiltered = sc.parallelize(startDate to endDate)
   .map(TableKey(_2))  /// Table Key does not have a single element constructor so this will fail
   .joinWithCassandraTable("listener","snapshots_test_b")

In general though, C* uses the entire partition key do determine where a particular row lives. Because of this you can only efficiently pull out data if you know the entire partition key so passing only a portion of it has no value.

The joinWithCassandraTable requires full partition key values so it can effeciently do it's work. if you only have a portion of the parition key you will be required to perform a full table scan and use Spark to filter.

If you only want to filter based on a clustering column you can do so by pushing down a where clause to C* such as

sc.cassandraTable("ks","test").where("clustering_key > someValue")
like image 111
RussS Avatar answered Oct 23 '22 03:10

RussS