I'm trying to filter on a small part of a huge Cassandra table by using:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
I want to map the rows in the cassandra table on 'created' column that is part of the partition key.
My table key (the partition key of the table) defined as:
case class TableKey(imei: String, created: Long, when: Long)
The result is an error:
[error] /home/ubuntu/scala/test/test.scala:61: not enough arguments for method apply: (imei: String, created: Long)test.TableKey in object TableKey. [error] Unspecified value parameter created. [error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") [error] ^ [error] one error found [error] (compile:compile) Compilation failed
It worked with only one object in the partition key as in the Documentation.
Why there is a problem with multiple partition key?- answered.
EDIT: I tried to use the joinWithCassandraTable in the right form:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
When I trying to run it on Spark there is no errors but it stuck on "[stage 0:> (0+2)/2]" forever...
What goes wrong?
The error is telling you that the class TableKey
requires 3 components to initialize yet there was only a single argument passed. This is a Scala compilation error and isn't related to C* or Spark.
val snapshotsFiltered = sc.parallelize(startDate to endDate)
.map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail
.joinWithCassandraTable("listener","snapshots_test_b")
In general though, C* uses the entire partition key
do determine where a particular row lives. Because of this you can only efficiently pull out data if you know the entire partition key
so passing only a portion of it has no value.
The joinWithCassandraTable requires full partition key
values so it can effeciently do it's work. if you only have a portion of the parition key
you will be required to perform a full table scan and use Spark to filter.
If you only want to filter based on a clustering column
you can do so by pushing down a where
clause to C* such as
sc.cassandraTable("ks","test").where("clustering_key > someValue")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With