I have tried to write a transform method from DataFrame to DataFrame. And I also want to test it by scalatest.
As you know, in Spark 2.x with Scala API, you can create SparkSession object as follows:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.bulider
.config("spark.master", "local[2]")
.getOrCreate()
This code works fine with unit tests. But, when I run this code with spark-submit, the cluster options did not work. For example,
spark-submit --master yarn --deploy-mode client --num-executors 10 ...
does not create any executors.
I have found that the spark-submit arguments are applied when I remove config("master", "local[2]")
part of the above code.
But, without master setting the unit test code did not work.
I tried to split spark (SparkSession) object generation part to test and main.
But there is so many code blocks needs spark, for example import spark.implicit,_
and spark.createDataFrame(rdd, schema)
.
Is there any best practice to write a code to create spark object both to test and to run spark-submit?
appName () – Sets a name to the Spark application that shows in the Spark web UI. If no application name is set, it sets a random name. getOrCreate () – This returns a SparkSession object if already exists. Creates a new one if not exist. You can get the existing SparkSession in Scala programmatically using the below example.
You can create as many SparkSession as you want in a Spark application using either SparkSession.builder () or SparkSession.newSession () . Many Spark session objects are required when you wanted to keep Spark tables (relational entities) logically separated. 2. SparkSession in spark-shell
SparkSession will be created using SparkSession.builder () builder patterns. Prior to Spark 2.0, SparkContext used to be an entry point, and it’s not been completely replaced with SparkSession, many features of SparkContext are still available and used in Spark 2.0 and later.
In Spark or PySpark SparkSession object is created programmatically using SparkSession.builder () and if you are using Spark shell SparkSession object “ spark ” is created by default for you as an implicit object whereas SparkContext is retrieved from the Spark session object by using sparkSession.sparkContext.
One way is to create a trait which provides the SparkContext/SparkSession, and use that in your test cases, like so:
trait SparkTestContext {
private val master = "local[*]"
private val appName = "testing"
System.setProperty("hadoop.home.dir", "c:\\winutils\\")
private val conf: SparkConf = new SparkConf()
.setMaster(master)
.setAppName(appName)
.set("spark.driver.allowMultipleContexts", "false")
.set("spark.ui.enabled", "false")
val ss: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val sc: SparkContext = ss.sparkContext
val sqlContext: SQLContext = ss.sqlContext
}
And your test class header then looks like this for example:
class TestWithSparkTest extends BaseSpec with SparkTestContext with Matchers{
I made a version where Spark will close correctly after tests.
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
trait SparkTest extends FunSuite with BeforeAndAfterAll with Matchers {
var ss: SparkSession = _
var sc: SparkContext = _
var sqlContext: SQLContext = _
override def beforeAll(): Unit = {
val master = "local[*]"
val appName = "MyApp"
val conf: SparkConf = new SparkConf()
.setMaster(master)
.setAppName(appName)
.set("spark.driver.allowMultipleContexts", "false")
.set("spark.ui.enabled", "false")
ss = SparkSession.builder().config(conf).getOrCreate()
sc = ss.sparkContext
sqlContext = ss.sqlContext
super.beforeAll()
}
override def afterAll(): Unit = {
sc.stop()
super.afterAll()
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With