Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I get the current SparkSession in any place of the codes?

I have created a session in the main() function, like this:

val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate()

Now if I want to configure the application or access the properties, I can use the local variable sparkSession in the same function.

What if I want to access this sparkSession elsewhere in the same project, like project/module/.../.../xxx.scala. What should I do?

like image 778
PC9527 Avatar asked Jun 12 '17 15:06

PC9527


4 Answers

Once a session was created (anywhere), you can safely use:

SparkSession.builder.getOrCreate()

To get the (same) session anywhere in the code, as long as the session is still alive. Spark maintains a single active session so unless it was stopped or crashed, you'll get the same one.

Edit: builder is not callable, as mentioned in the comments.

like image 96
Tzach Zohar Avatar answered Oct 24 '22 16:10

Tzach Zohar


Since 2.2.0 you can access the active SparkSession through:

/**
 * Returns the active SparkSession for the current thread, returned by the builder.
 *
 * @since 2.2.0
 */
def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get)

or default SparkSession:

/**
 * Returns the default SparkSession that is returned by the builder.
 *
 * @since 2.2.0
 */
def getDefaultSparkSession: Option[SparkSession] = Option(defaultSession.get)
like image 40
Andrei Boaghe Avatar answered Oct 24 '22 14:10

Andrei Boaghe


When SparkSession variable has been defined as

val sparkSession = SparkSession.builder.master("local[*]").appName("Simple Application").getOrCreate()

This variable is going to point/refer to only one SparkSession as its a val. And you can always pass to different classes for them to access as well as

val newClassCall = new NewClass(sparkSession)

Now you can use the same sparkSession in that new class as well.

like image 44
Ramesh Maharjan Avatar answered Oct 24 '22 16:10

Ramesh Maharjan


This is a old question and there are couple of answer that are good enough but I would like to give one more approach that can be used to make it work.

You can create a trait that extends from serializable and create spark session as a lazy variable and then through out your project in all the objects that you create, you can extend that trait and it will give you sparksession instance.

Code as below:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
trait SparkSessionWrapper extends Serializable {
  lazy val spark: SparkSession = {
    SparkSession.builder().appName("TestApp").getOrCreate()
  }
//object with the main method and it extends SparkSessionWrapper 
object App extends SparkSessionWrapper {
 def main(args: Array[String]): Unit = {
   val readdf = ReadFileProcessor.ReadFile("testpath")
   readdf.createOrReplaceTempView("TestTable")
   val viewdf = spark.sql("Select * from TestTable")
 }
}

object ReadFileProcessor extends SparkSessionWrapper{
 def ReadFile(path: String) : DataFrame = {
  val df = spark.read.format("csv").load(path)
  df
 }
}

As you are extending the SparkSessionWrapper on both the Objects that you created, spark session would get initialized when first time spark variable is encountered in the code and then you refer it on any object that extends that trait without passing that as a parameter to the method. It works or give you a experience that is similar to notebook.

Update : If you even want it to be more generic and have an need to even set the custom appname based on the type of workflow you are running you can do it as below :

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
trait SparkSessionWrapper extends Serializable {
  lazy val spark: SparkSession = {
    createSparkSession(appname)
  }
def appname : String
def createSparkSession(appname : String) : SparkSession ={
    SparkSession.builder().appName(appname).master("local[*]").getOrCreate()
  }
//object with the main method and it extends SparkSessionWrapper 
object App extends SparkSessionWrapper {
 def main(args: Array[String]): Unit = {
   val readdf = ReadFileProcessor.ReadFile("testpath")
   readdf.createOrReplaceTempView("TestTable")
   val viewdf = spark.sql("Select * from TestTable")
 }
override def appname: String = "ReadFile"
}

object ReadFileProcessor extends SparkSessionWrapper{
 def ReadFile(path: String) : DataFrame = {
  val df = spark.read.format("csv").load(path)
  df
 }
override def appname: String = "ReadcsvFile"
}

the only main difference is that you need to create an abstract function inside the trait and then you would have to override that into any of the startup class that you are using to provide the value.

like image 45
Nikunj Kakadiya Avatar answered Oct 24 '22 14:10

Nikunj Kakadiya