Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Trying to run SparkSQL over Spark Streaming

I am trying to run SQL queries over streaming data in spark. This looks pretty straight forward but when I try it, I get the error table not found : tablename>. It unable to find the table I've registered.

Using Spark SQL with batch data works fine so I'm thinking it has to do with how I'm calling streamingcontext.start(). Any ideas what is the issue? Here is the code:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext

object Streaming {

def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
    lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data"))
    // lines.foreachRDD(rdd=>rdd.foreach(println))
    val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
    ssc.start()
    ssc.awaitTermination()
  }
}

Any suggestions welcome. Thanks.

like image 833
Pravesh Jain Avatar asked Aug 21 '14 05:08

Pravesh Jain


People also ask

What is the difference between Spark and Spark streaming?

Generally, Spark streaming is used for real time processing. But it is an older or rather you can say original, RDD based Spark structured streaming is the newer, highly optimized API for Spark. Users are advised to use the newer Spark structured streaming API for Spark. Hope this will clear your doubt.

What is the main disadvantage of Spark streaming?

Apache Spark Streaming ConsThere are a lot of issues with memory management and latency. There is no real-time analytics. We recommend it for the use cases where there is a five-second latency, but not for a millisecond, an IOT-based, or the detection anomaly-based. Flink as a service is much better.

Is Spark and Spark SQL same?

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data.


1 Answers

Well I got to know the problem. You have to query the data within the foreachRDD function, otherwise the table is not recognized. Something like this works:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object Mlist {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
like image 177
Pravesh Jain Avatar answered Oct 17 '22 01:10

Pravesh Jain