Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How should I write unit tests in Spark, for a basic data frame creation example?

I'm struggling to write a basic unit test for creation of a data frame, using the example text file provided with Spark, as follows.

class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {

private val master = "local[*]"
private val appName = "data_load_testing"

private var spark: SparkSession = _

override def beforeEach() {
  spark = new SparkSession.Builder().appName(appName).getOrCreate()
}

import spark.implicits._

 case class Person(name: String, age: Int)

  val df = spark.sparkContext
      .textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0),attributes(1).trim.toInt))
      .toDF()

  test("Creating dataframe should produce data from of correct size") {
  assert(df.count() == 3)
  assert(df.take(1).equals(Array("Michael",29)))
}

override def afterEach(): Unit = {
  spark.stop()
}

}

I know that the code itself works (from spark.implicits._ .... toDF()) because I have verified this in the Spark-Scala shell, but inside the test class I'm getting lots of errors; the IDE does not recognise 'import spark.implicits._, or toDF(), and therefore the tests don't run.

I am using SparkSession which automatically creates SparkConf, SparkContext and SQLContext under the hood.

My code simply uses the example code from the Spark repo.

Any ideas why this is not working? Thanks!

NB. I have already looked at the Spark unit test questions on StackOverflow, like this one: How to write unit tests in Spark 2.0+? I have used this to write the test but I'm still getting the errors.

I'm using Scala 2.11.8, and Spark 2.2.0 with SBT and IntelliJ. These dependencies are correctly included within the SBT build file. The errors on running the tests are:

Error:(29, 10) value toDF is not a member of org.apache.spark.rdd.RDD[dataLoadTest.this.Person] possible cause: maybe a semicolon is missing before `value toDF'? .toDF()

Error:(20, 20) stable identifier required, but dataLoadTest.this.spark.implicits found. import spark.implicits._

IntelliJ won't recognise import spark.implicits._ or the .toDF() method.

I have imported: import org.apache.spark.sql.SparkSession import org.scalatest.{BeforeAndAfterEach, FlatSpec, FunSuite, Matchers}

like image 748
LucieCBurgess Avatar asked Jan 29 '23 20:01

LucieCBurgess


2 Answers

you need to assign sqlContext to a val for implicits to work . Since your sparkSession is a var, implicits won't work with it

So you need to do

val sQLContext = spark.sqlContext
import sQLContext.implicits._

Moreover you can write functions for your tests so that your test class looks as following

    class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {

  private val master = "local[*]"
  private val appName = "data_load_testing"

  var spark: SparkSession = _

  override def beforeEach() {
    spark = new SparkSession.Builder().appName(appName).master(master).getOrCreate()
  }


  test("Creating dataframe should produce data from of correct size") {
    val sQLContext = spark.sqlContext
    import sQLContext.implicits._

    val df = spark.sparkContext
    .textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
    .map(_.split(","))
    .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
    .toDF()

    assert(df.count() == 3)
    assert(df.take(1)(0)(0).equals("Michael"))
  }

  override def afterEach() {
    spark.stop()
  }

}
case class Person(name: String, age: Int)
like image 174
Ramesh Maharjan Avatar answered Feb 06 '23 15:02

Ramesh Maharjan


There are many libraries for unit testing of spark, one of the mostly used is

spark-testing-base: By Holden Karau

This library have all with sc as the SparkContext below is a simple example

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

Here, every thing is prepared with sc as a SparkContext

Another approach is to create a TestWrapper and use for the multiple testcases as below

import org.apache.spark.sql.SparkSession

trait TestSparkWrapper {

  lazy val sparkSession: SparkSession = 
    SparkSession.builder().master("local").appName("spark test example ").getOrCreate()

}

And use this TestWrapper for all the tests with Scala-test, playing with BeforeAndAfterAll and BeforeAndAfterEach.

Hope this helps!

like image 27
koiralo Avatar answered Feb 06 '23 14:02

koiralo