Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SQL over Spark Streaming

This is the code to run simple SQL queries over Spark Streaming.

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object StreamingSQL {

  case class Persons(name: String, age: Int)

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

As you can see, to run SQL over streaming, the query has to be made inside the foreachRDD method. I want to run SQL join over data received from two different streams. Is there any way that can be done?

like image 270
Pravesh Jain Avatar asked Aug 25 '14 11:08

Pravesh Jain


2 Answers

Well, I would like to sum up the workaround we arrived on after the discussion in the answer by Spiro. His suggestion to first create an empty table and then inserting RDDs into it was bang on. The only problem is that Spark doesn't allow to insert into tables yet. Here is what can be done:

First, create an RDD that has the same schema as the one you're expecting from your stream:

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))

Then save it as a Parquet File

d1.saveAsParquetFile("/home/p1.parquet")

Now, load the parquet file and register it as table using the registerAsTable() method.

val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")

Now, when you receive your stream, just apply a foreachRDD() on your stream and keep inserting the individual RDDs in the table created above using the insertInto() method

dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})

This insertInto() works fine and allows the data to be collected into a table. Now you can do the same for any number of streams and then run your queries.

like image 64
Pravesh Jain Avatar answered Oct 31 '22 13:10

Pravesh Jain


The way you've written your code, you end up producing a sequence of little SchemaRDDs each time you run a SQL query. The trick is to save each of these to either an accumulation RDD or an accumulation table.

First, the table approach, using insertInto:

For each of your streams, first create an emty RDD that you register as a table, obtaining an empty table. For your example, let's say you call it "allTeenagers".

Then, for each of your queries, use SchemaRDD's insertInto method to add the result to that table:

teenagers.insertInto("allTeenagers")

If you do this with both your streams, creating two separate accumulation tables, you can then join them using a plain old SQL query.

(Note: I haven't actually been able to get his to work, and a little searching makes me doubt that anybody else has, but I'm pretty sure I've understood the design intent of insertInto, so I think this solution is worth recording.)

Second, the unionAll approach (there's also a union method but that makes it trickier to get the types right):

This involves creating an initial RDD -- again let's call it allTeenagers.

// create initial SchemaRDD even if it's empty, so the types work out right
var allTeenagers = sqc.sql("SELECT ...")

Then, each time:

val teenagers = sqc.sql("SELECT ...")
allTeenagers = allTeenagers.unionAll(teenagers)

Perhaps needless to say that you need the columns to match up.

like image 5
Spiro Michaylov Avatar answered Oct 31 '22 14:10

Spiro Michaylov