Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How many SparkSessions can a single application have?

I have found that as Spark runs, and tables grow in size (through Joins) that the spark executors will eventually run out of memory and the entire system crashes. Even if I try to write temporary results to Hive tables (on HDFS), the system still doesn't free much memory, and my entire system crashes after about 130 joins.

However, through experimentation, I realized that if I break the problem into smaller pieces, write temporary results to hive tables, and Stop/Start the Spark session (and spark context), then the system's resources are freed. I was able to join over 1,000 columns using this approach.

But I can't find any documentation to understand if this is considered a good practice or not (I know you should not acquire multiple sessions at once). Most systems acquire the session in the beginning and close it in the end. I could also break the application into smaller ones, and use a driver like Oozie to schedule these smaller applications on Yarn. But this approach would start and stop the JVM at each stage, which seems a bit heavy-weight.

So my question: is it bad practice to continually start/stop the spark session to free system resources during the run of a single spark application?


But can you elaborate on what you mean by a single SparkContext on a single JVM? I was able call sparkSession.sparkContext().stop(), and also stop the SparkSession. I then created a new SparkSession and used a new sparkContext. No error was thrown.

I was also able to use this on the JavaSparkPi without any problems.

I have tested this in yarn-client and a local spark install.

What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?

like image 902
irbull Avatar asked Dec 08 '17 23:12

irbull


People also ask

How many SparkSession can be created?

Spark session should be created only once per spark application. Spark doesn't support this and your job might will fail if you use multiple spark session in the same spark job. Here is the SPARK-2243 where spark has closed the ticket saying it won't fix it.

Is it possible to have multiple SparkContext in single JVM?

Only one SparkContext should be active per JVM. You must stop() the active SparkContext before creating a new one. param: config a Spark Config object describing the application configuration. Any settings in this config overrides the default configs as well as system properties.

Can I have multiple Spark sessions?

Spark applications can use multiple sessions to use different underlying data catalogs. You can use an existing Spark session to create a new session by calling the newSession method.

Can we have multiple Spark context?

Note: we can have multiple spark contexts by setting spark. driver. allowMultipleContexts to true . But having multiple spark contexts in the same jvm is not encouraged and is not considered as a good practice as it makes it more unstable and crashing of 1 spark context can affect the other.


1 Answers

TL;DR You can have as many SparkSessions as needed.

You can have one and only one SparkContext on a single JVM, but the number of SparkSessions is pretty much unbounded.

But can you elaborate on what you mean by a single SparkContext on a single JVM?

It means that at any given time in the lifecycle of a Spark application the driver can only be one and only one which in turn means that there's one and only one SparkContext on that JVM available.

The driver of a Spark application is where the SparkContext lives (or it's the opposite rather where SparkContext defines the driver -- the distinction is pretty much blurry).

You can only have one SparkContext at one time. Although you can start and stop it on demand as many times you want, but I remember an issue about it that said you should not close SparkContext unless you're done with Spark (which usually happens at the very end of your Spark application).

In other words, have a single SparkContext for the entire lifetime of your Spark application.

There was a similar question What's the difference between SparkSession.sql vs Dataset.sqlContext.sql? about multiple SparkSessions that can shed more light on why you'd want to have two or more sessions.

I was able call sparkSession.sparkContext().stop(), and also stop the SparkSession.

So?! How does this contradict what I said?! You stopped the only SparkContext available on the JVM. Not a big deal. You could, but that's just one part of "you can only have one and only one SparkContext on a single JVM available", isn't it?

SparkSession is a mere wrapper around SparkContext to offer Spark SQL's structured/SQL features on top of Spark Core's RDDs.

From the point of Spark SQL developer, the purpose of a SparkSession is to be a namespace for query entities like tables, views or functions that your queries use (as DataFrames, Datasets or SQL) and Spark properties (that could have different values per SparkSession).

If you'd like to have the same (temporary) table name used for different Datasets, creating two SparkSessions would be what I'd consider the recommended way.

I've just worked on an example to showcase how whole-stage codegen works in Spark SQL and have created the following that simply turns the feature off.

// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
   +- LocalTableScan [_1#88, _2#89, _3#90]

// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)

scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2

// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100

import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)

// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
   +- LocalTableScan [_1#121, _2#122, _3#123]

I then created a new SparkSession and used a new SparkContext. No error was thrown.

Again, how does this contradict what I said about a single SparkContext being available? I'm curious.

What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?

You can no longer use it to run Spark jobs (to process large and distributed datasets) which is pretty much exactly the reason why you use Spark in the first place, doesn't it?

Try the following:

  1. Stop SparkContext
  2. Execute any processing using Spark Core's RDD or Spark SQL's Dataset APIs

An exception? Right! Remember that you close the "doors" to Spark so how could you have expected to be inside?! :)

like image 107
Jacek Laskowski Avatar answered Dec 06 '22 20:12

Jacek Laskowski