Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Testing Spark with pytest - cannot run Spark in local mode

I am trying to run wordcount test using pytest from this site - Unit testing Apache Spark with py.test. The problem is that I cannot start spark context. Code I use to run Spark Context:

@pytest.fixture(scope="session")
def spark_context(request):
    """ fixture for creating a spark context
    Args:
        request: pytest.FixtureRequest object
    """
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
    sc = SparkContext(conf=conf)
    request.addfinalizer(lambda: sc.stop())

    quiet_py4j()
    return sc

I execute this code using command:

#first way
pytest spark_context_fixture.py

#second way
python spark_context_fixture.py

The output:

platform linux2 -- Python 2.7.5, pytest-3.0.4, py-1.4.31, pluggy-0.4.0
rootdir: /home/mgr/test, inifile:
collected 0 items

Then I want to run wordcount test using pytest.

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
    """ test word couting
    Args:
        spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

But the output is:

________ ERROR at setup of test_do_word_counts _________
file /home/mgrabowski/test/wordcount_test.py, line 5
  def test_do_word_counts(spark_context):
E       fixture 'spark_context' not found
>       available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory
>       use 'pytest --fixtures [testpath]' for help on them.

Does anyone know what is the reason of this issue?

like image 508
Mrgr8m4 Avatar asked Dec 05 '16 13:12

Mrgr8m4


People also ask

How do I run spark in local mode?

So, how do you run the spark in local mode? It is very simple. When we do not specify any --master flag to the command spark-shell, pyspark, spark-submit, or any other binary, it is running in local mode. Or we can specify --master option with local as argument which defaults to 1 thread.

Can you run spark locally?

It's easy to run locally on one machine — all you need is to have java installed on your system PATH , or the JAVA_HOME environment variable pointing to a Java installation. Spark runs on Java 8/11/17, Scala 2.12/2.13, Python 3.7+ and R 3.5+.

How do I run a spark job in python?

Generally, PySpark (Spark with Python) application should be run by using spark-submit script from shell or by using Airflow/Oozie/Luigi or any other workflow tools however some times you may need to run PySpark application from another python program and get the status of the job, you can do this by using Python ...


1 Answers

I did some research and finally found the solution. I use Spark 1.6.

First of all I added two lines to my .bashrc file.

export SPARK_HOME=/usr/hdp/2.5.0.0-1245/spark
export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPA‌​TH

Then I created file "conftest.py". Filename is really important, you should not change it otherwise you will see error with spark_context. If you use Spark in local mode and do not use YARN, conftest.py should look like that:

import logging
import pytest

from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def quiet_py4j():
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)

@pytest.fixture(scope="session")
def spark_context(request):
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    quiet_py4j()
    return sc

@pytest.fixture(scope="session")
def hive_context(spark_context):
    return HiveContext(spark_context)

@pytest.fixture(scope="session")
def streaming_context(spark_context):
    return StreamingContext(spark_context, 1)

Now you can run tests by using simple pytest command. Pytest should run Spark and stopped it after all.

If you use YARN you can change conftest.py to:

    import logging
    import pytest

    from pyspark import HiveContext
    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    def quiet_py4j():
        """ turn down spark logging for the test context """
        logger = logging.getLogger('py4j')
        logger.setLevel(logging.WARN)

    @pytest.fixture(scope="session",
                params=[pytest.mark.spark_local('local'),
                        pytest.mark.spark_yarn('yarn')])
    def spark_context(request):
        if request.param == 'local':
            conf = (SparkConf()
                    .setMaster("local[2]")
                    .setAppName("pytest-pyspark-local-testing")
                    )
        elif request.param == 'yarn':
            conf = (SparkConf()
                    .setMaster("yarn-client")
                    .setAppName("pytest-pyspark-yarn-testing")
                    .set("spark.executor.memory", "1g")
                    .set("spark.executor.instances", 2)
                    )
        request.addfinalizer(lambda: sc.stop())

        sc = SparkContext(conf=conf)
        return sc

    @pytest.fixture(scope="session")
    def hive_context(spark_context):
        return HiveContext(spark_context)

    @pytest.fixture(scope="session")
    def streaming_context(spark_context):
        return StreamingContext(spark_context, 1)

Now you can run tests in local mode by calling py.test -m spark_local and in YARN mode by calling py.test -m spark_yarn.

Wordcount example

In the same folder create three files: conftest.py (above), wordcount.py:

def do_word_counts(lines):
    counts = (lines.flatMap(lambda x: x.split())
                  .map(lambda x: (x, 1))
                  .reduceByKey(lambda x, y: x+y)
             ) 
    results = {word: count for word, count in counts.collect()}
    return results

And wordcount_test.py:

import pytest
import wordcount

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

Now you can run tests by calling pytest.

like image 143
Mrgr8m4 Avatar answered Oct 11 '22 08:10

Mrgr8m4