I have the following snippets of the code and I wonder what is the difference between these two and which one should I use? I am using spark 2.2.
Dataset<Row> df = sparkSession.readStream()
.format("kafka")
.load();
df.createOrReplaceTempView("table");
df.printSchema();
Dataset<Row> resultSet = df.sqlContext().sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.format("console")
.start();
vs
Dataset<Row> df = sparkSession.readStream()
.format("kafka")
.load();
df.createOrReplaceTempView("table");
Dataset<Row> resultSet = sparkSession.sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.format("console")
.start();
There is a very subtle difference between sparkSession.sql("sql query")
vs df.sqlContext().sql("sql query")
.
Please note that you can have zero, two or more SparkSession
s in a single Spark application (but it's assumed you'll have at least and often only one SparkSession
in a Spark SQL application).
Please also note that a Dataset
is bound to the SparkSession
it was created within and the SparkSession
will never change.
You may be wondering why anyone would want it, but that gives you boundary between queries and you could use the same table names for different datasets and that is a very powerful feature of Spark SQL actually.
The following example shows the difference and hopefully will give you some idea why it's powerful after all.
scala> spark.version
res0: String = 2.3.0-SNAPSHOT
scala> :type spark
org.apache.spark.sql.SparkSession
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> val df = spark.range(5)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> df.sqlContext.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> val anotherSession = spark.newSession
anotherSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@195c5803
scala> anotherSession.range(10).createOrReplaceTempView("new_table")
scala> anotherSession.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| |new_table| true|
+--------+---------+-----------+
scala> df.sqlContext.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With