Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I tear down a SparkSession and create a new one within one application?

Tags:

I have a pyspark program with multiple independent modules that can each independently process data to meet my various needs. But they can also be chained together to process data in a pipeline. Each of these modules builds a SparkSession and executes perfectly on their own.

However, when I try to run them serially within the same python process, I run into issues. At the moment when the second module in the pipeline executes, spark complains that the SparkContext I am attempting to use has been stopped:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet. : java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. 

Each of these modules builds a SparkSession at the beginning of execution and stops the sparkContext at the end of its process. I build and stop sessions/contexts like so:

session = SparkSession.builder.appName("myApp").getOrCreate() session.stop() 

According to official documentation, getOrCreate "gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder." But I don't want this behavior (this behavior where the process attempts to get an existing session). I can't find any way to disable it, and I can't figure out how to destroy the session -- I only know how to stop its associated SparkContext.

How can I build new SparkSessions in independent modules, and execute them in sequence in the same Python process without previous sessions interfering with the newly created ones?

The following is an example of the project structure:

main.py

import collect import process  if __name__ == '__main__':     data = collect.execute()     process.execute(data) 

collect.py

import datagetter  def execute(data=None):     session = SparkSession.builder.appName("myApp").getOrCreate()      data = data if data else datagetter.get()     rdd = session.sparkContext.parallelize(data)     [... do some work here ...]     result = rdd.collect()     session.stop()     return result 

process.py

import datagetter  def execute(data=None):     session = SparkSession.builder.appName("myApp").getOrCreate()     data = data if data else datagetter.get()     rdd = session.sparkContext.parallelize(data)     [... do some work here ...]     result = rdd.collect()     session.stop()     return result 
like image 648
vaer-k Avatar asked Jan 05 '17 18:01

vaer-k


People also ask

Can we create multiple SparkSession?

Starting in Spark 2.0, the SparkSession encapsulates both. Spark applications can use multiple sessions to use different underlying data catalogs. You can use an existing Spark session to create a new session by calling the newSession method.

Can we create multiple SparkContext?

Since the question talks about SparkSessions, it's important to point out that there can be multiple SparkSession s running but only a single SparkContext per JVM.

How many SparkContext can be created?

Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. The first thing a Spark program must do is to create a JavaSparkContext object, which tells Spark how to access a cluster.

How do you get a SparkSession from a data frame?

SparkSession from DataFrame If you have a DataFrame, you can use it to access the SparkSession, but it's best to just grab the SparkSession with getActiveSession() . Let's shut down the active SparkSession to demonstrate the getActiveSession() returns None when no session exists.


1 Answers

Long story short, Spark (including PySpark) is not designed to handle multiple contexts in a single application. If you're interested in JVM side of the story I would recommend reading SPARK-2243 (resolved as won't fix).

There is a number of design decisions made in PySpark which reflects that including, but not limited to a singleton Py4J gateway. Effectively you cannot have multiple SparkContexts in a single application. SparkSession is not only bound to SparkContext but also introduces problems of its own, like handling local (standalone) Hive metastore if one is used. Moreover there functions which use SparkSession.builder.getOrCreate internally and depend on the behavior you see right now. A notable example is UDF registration. Other functions may exhibit unexpected behavior if multiple SQL contexts are present (for example RDD.toDF).

Multiple contexts are not only unsupported but also, in my personal opinion, violate single responsibility principle. Your business logic shouldn't be concerned with all the setup, cleanup and configuration details.

My personal recommendations are as follows:

  • If application consist of multiple coherent modules which can be composed and benefit from a single execution environment with caching and common metastore initialize all required contexts in the application entry point and pass these down to individual pipelines when necessary:

    • main.py:

      from pyspark.sql import SparkSession  import collect import process  if __name__ == "__main__":     spark: SparkSession = ...      # Pass data between modules     collected = collect.execute(spark)     processed = process.execute(spark, data=collected)     ...     spark.stop() 
    • collect.py / process.py:

      from pyspark.sql import SparkSession  def execute(spark: SparkSession, data=None):     ... 
  • Otherwise (it seems to be the case here based on your description) I would design entrypoint to execute a single pipeline and use external worfklow manager (like Apache Airflow or Toil) to handle the execution.

    It is not only cleaner but also allows for much more flexible fault recovery and scheduling.

    The same thing can be of course done with builders but like a smart person once said: Explicit is better than implicit.

    • main.py

      import argparse  from pyspark.sql import SparkSession  import collect import process  pipelines = {"collect": collect, "process": process}  if __name__ == "__main__":     parser = argparse.ArgumentParser()     parser.add_argument('--pipeline')     args = parser.parse_args()      spark: SparkSession = ...      # Execute a single pipeline only for side effects     pipelines[args.pipeline].execute(spark)     spark.stop() 
    • collect.py / process.py as in the previous point.

One way or another I would keep one and only one place where context is set up and one and only one place were it is tear down.

like image 175
zero323 Avatar answered Dec 03 '22 06:12

zero323