I'm trying to test a part of my program which performs transformations on dataframes I want to test several different variations of these dataframe which rules out the option of reading a specific DF from a file
And so my questions are:
I obviously googled that before but could not find anything which was very useful. Among the more useful links I found were:
It would be great if examples/tutorials are in Scala but I'll take whatever language you've got
Thanks in advance
spark-testing-base is a library that simplifies the unit testing of Spark applications. It provides utility classes to create out-of-the-box Spark sessions and DataFrame utility methods that can be used in assert statements. ScalaTest. ScalaTest is a powerful tool that can be used to unit test Scala and Java code.
Build a simple ETL function in PySpark. In order to write a test case, we will first need functionality that needs to be tested. In this example, we will write a function that performs a simple transformation. On a fundamental level an ETL job must do the following: Extract data from a source.
This link shows how we can programmatically create a data frame with schema. You can keep the data in separate traits and mix it in with your tests. For instance,
// This example assumes CSV data. But same approach should work for other formats as well.
trait TestData {
val data1 = List(
"this,is,valid,data",
"this,is,in-valid,data",
)
val data2 = ...
}
Then with ScalaTest, we can do something like this.
class MyDFTest extends FlatSpec with Matchers {
"method" should "perform this" in new TestData {
// You can access your test data here. Use it to create the DataFrame.
// Your test here.
}
}
To create the DataFrame, you can have few util methods like below.
def schema(types: Array[String], cols: Array[String]) = {
val datatypes = types.map {
case "String" => StringType
case "Long" => LongType
case "Double" => DoubleType
// Add more types here based on your data.
case _ => StringType
}
StructType(cols.indices.map(x => StructField(cols(x), datatypes(x))).toArray)
}
def df(data: List[String], types: Array[String], cols: Array[String]) = {
val rdd = sc.parallelize(data)
val parser = new CSVParser(',')
val split = rdd.map(line => parser.parseLine(line))
val rdd = split.map(arr => Row(arr(0), arr(1), arr(2), arr(3)))
sqlContext.createDataFrame(rdd, schema(types, cols))
}
I am not aware of any utility classes for checking specific values in a DataFrame. But I think it should be simple to write one using the DataFrame APIs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With