Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I unit test PySpark programs?

My current Java/Spark Unit Test approach works (detailed here) by instantiating a SparkContext using "local" and running unit tests using JUnit.

The code has to be organized to do I/O in one function and then call another with multiple RDDs.

This works great. I have a highly tested data transformation written in Java + Spark.

Can I do the same with Python?

How would I run Spark unit tests with Python?

like image 440
pettinato Avatar asked Nov 19 '15 18:11

pettinato


People also ask

How do you test PySpark DataFrames?

You can test PySpark code by running your code on DataFrames in the test suite and comparing DataFrame column equality or equality of two entire DataFrames. The quinn project has several examples. Create a tests/conftest.py file with this fixture, so you can easily access the SparkSession in your tests.

How do you unit test a program?

A typical unit test contains 3 phases: First, it initializes a small piece of an application it wants to test (also known as the system under test, or SUT), then it applies some stimulus to the system under test (usually by calling a method on it), and finally, it observes the resulting behavior.

What is assert in PySpark?

The assert keyword is used when debugging code. The assert keyword lets you test if a condition in your code returns True, if not, the program will raise an AssertionError. You can write a message to be written if the code returns False, check the example below.


1 Answers

I'd recommend using py.test as well. py.test makes it easy to create re-usable SparkContext test fixtures and use it to write concise test functions. You can also specialize fixtures (to create a StreamingContext for example) and use one or more of them in your tests.

I wrote a blog post on Medium on this topic:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

Here is a snippet from the post:

pytestmark = pytest.mark.usefixtures("spark_context") def test_do_word_counts(spark_context):     """ test word couting     Args:        spark_context: test fixture SparkContext     """     test_input = [         ' hello spark ',         ' hello again spark spark'     ]      input_rdd = spark_context.parallelize(test_input, 1)     results = wordcount.do_word_counts(input_rdd)      expected_results = {'hello':2, 'spark':3, 'again':1}       assert results == expected_results 
like image 180
Vikas Kawadia Avatar answered Sep 26 '22 00:09

Vikas Kawadia