I have a problem when i use spark streaming to read from Cassandra.
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext
As the link above, i use
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
to select the data from cassandra, but it seems that the spark streaming has just one query once but i want it continues to query using an interval 10 senconds.
My code is as follow, wish for your response.
Thanks!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}
To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that. This should output compiled jar files to the directory named “target”. There will be two jar files, one for Scala and one for Java.
They also use very efficient and low latency SSDs. This is a similar setup used in Cassandra database clusters, so these types of clusters can run Spark + Cassandra on the same machine types using Cassandra instead of HDFS for storage.
Use readStream. format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, and Kinesis, or by applying high-level operations on other DStreams.
You can create a ConstantInputDStream with the CassandraRDD as input. ConstantInputDStream will provide the same RDD on each streaming interval, and by executing an action on that RDD you will trigger a materialization of the RDD lineage, leading to executing the query on Cassandra every time.
Make sure that the data being queried does not grow unbounded to avoid increasing query times and resulting in an unstable streaming process.
Something like this should do the trick (using your code as starting point):
import org.apache.spark.streaming.dstream.ConstantInputDStream
val ssc = new StreamingContext(conf, Seconds(10))
val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD{ rdd =>
// any action will trigger the underlying cassandra query, using collect to have a simple output
println(rdd.collect.mkString("\n"))
}
ssc.start()
ssc.awaitTermination()
I had the same issue and found a solution by creating a subclass of InputDStream class. It is necessary to define start()
and compute()
methods.
start()
can be used for preparation. The main logic resides in compute()
. It shall return Option[RDD[T]]
.
To make the class flexible, InputStreamQuery
trait is defined.
trait InputStreamQuery[T] {
// where clause condition for partition key
def partitionCond : (String, Any)
// function to return next partition key
def nextValue(v:Any) : Option[Any]
// where clause condition for clustering key
def whereCond : (String, (T) => Any)
// batch size
def batchSize : Int
}
For the Cassandra table keyspace.test
, create test_by_date
which reorganizes the table by the partitioning key date
.
CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))
CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
SELECT *
FROM keyspace.test
WHERE id IS NOT NULL
PRIMARY KEY (date, id)
WITH CLUSTERING ORDER BY ( id ASC );
One possible implementation for test
table shall be
class class Test(id:UUID, date:String, value:String)
trait InputStreamQueryTest extends InputStreamQuery[Test] {
val dateFormat = "uuuu-MM-dd"
// set batch size as 10 records
override def batchSize: Int = 10
// partitioning key conditions, query string and initial value
override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
// clustering key condition, query string and function to get clustering key from the instance
override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
// return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
override def nextValue(v: Any): Option[Any] = {
import java.time.format.DateTimeFormatter
val formatter = DateTimeFormatter.ofPattern( dateFormat)
val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
if ( nextDate.isAfter( LocalDate.now()) ) None
else Some( nextDate.format(formatter))
}
}
It can be used in the CassandraInputStream
class as follows.
class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T])
extends InputDStream[T](_ssc) with InputStreamQuery[T] {
var lastElm:Option[T] = None
var partitionKey : Any = _
override def start(): Unit = {
// find a partition key which stores some records
def findStartValue(cql : String, value:Any): Any = {
val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)
if (rdd.cassandraCount() > 0 ) value
else {
nextValue(value).map( findStartValue( cql, _)).getOrElse( value)
}
}
// get query string and initial value from partitionCond method
val (cql, value) = partitionCond
partitionKey = findStartValue(cql, value)
}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
val (cql, _) = partitionCond
val (wh, whKey) = whereCond
def fetchNext( patKey: Any) : Option[CassandraTableScanRDD[T]] = {
// query with partitioning condition
val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where( cql, patKey)
val rdd = lastElm.map{ x =>
query.where( wh, whKey(x)).withAscOrder.limit(batchSize)
}.getOrElse( query.withAscOrder.limit(batchSize))
if ( rdd.cassandraCount() > 0 ) {
// store the last element of this RDD
lastElm = Some(rdd.collect.last)
Some(rdd)
}
else {
// find the next partition key which stores data
nextValue(patKey).flatMap{ k =>
partitionKey = k
fetchNext(k)}
}
}
fetchNext( partitionKey)
}
}
Combining all the classes,
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest
dstream.map(println).saveToCassandra( ... )
ssc.start()
ssc.awaitTermination()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With