Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error while Importing pyspark ETL module and running as child process using pything subprocess

Tags:

python

pyspark

I'm trying to call a list of pyspark modules dynamically from one main.py python script, using Import module and subprocess. The child modules I'm trying to call does not return anything, it just does its ETL operation. I want my main.py program to wait until the child process completes. In the code below, everytime I'm trying to call the child process, I end up with the error "TypeError: 'NoneType' object is not iterable". One other problem is, after initiating the subprocess.Popen, I thought the flow will keep continuing in the main.py to next line, until it hits the j1.wait(), but the immediate print statment (print("etl_01_job is running") is not executing, Am I missing anything?

I googled and tried a lot of other ways, but nothing is working. can anyone shed some light on what am I doing wrong? Once I'm able to successfully call the child process, I have to add few other conditions based on the return code of the child proces. But at this point, I want to fix this issue. Thanks

main.py

import json
import importlib
import subprocess
from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession


def main():
    with open('C:/Pyspark/test/config/config.json', 'r') as config_file:
        config = json.load(config_file)

    spark = SparkSession.builder\
        .appName(config.get("app_name"))\
        .getOrCreate()

    job_module1 = importlib.import_module("etl_01_job")
    print("main calling time :", datetime.now())
    j1 = subprocess.Popen(job_module1.run_etl_01_job(spark, config))
    print("etl_01_job is running")
    j1.wait() #I'm expecting the main.py to wait until child process finishes
    print("etl_01_job finished")

    job_module2 = importlib.import_module("etl_02_job")
    j2 = subprocess.Popen(job_module2.run_etl_02_job(spark, config))

if __name__ == "__main__":
    main()

Child pyspark job:etl_01_job.py : Not the original code, just a sample script

from datetime import datetime
import time
import sys

def etl_01_job(spark, config):
    print("I'm in 01etljob")
    print(config)
    print(config.get("app_name"))
    time.sleep(10)
    print("etljob 1 ending time :", datetime.now())
def run_etl_01_job(spark, config):
    etl_01_job(spark, config)

The error I'm getting is

Traceback (most recent call last):
  File "C:/py_spark/src/main.py", line 49, in <module>
    main()
  File "C:/py_spark/src/main.py", line 38, in main
    p1 = subprocess.run(job_module1.run_etl_01_job(spark, config))
  File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 489, in run
    with Popen(*popenargs, **kwargs) as process:
  File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 854, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 1247, in _execute_child
    args = list2cmdline(args)
  File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 549, in list2cmdline
    for arg in map(os.fsdecode, seq):
TypeError: 'NoneType' object is not iterable
like image 245
user7343922 Avatar asked Apr 17 '21 18:04

user7343922


People also ask

How to import pyspark modules in Python?

Even after installing PySpark you are getting “ No module named pyspark" in Python, this could be due to environment variables issues, you can solve this by installing and import findspark. findspark library searches pyspark installation on the server and adds PySpark installation path to sys.path at runtime so that you can import PySpark modules.

How to fix ‘no module named ‘pyspark’ error in Python?

1. Install PySpark to resolve No module named ‘pyspark’ Error Note that PySpark doesn’t come with Python installation hence it will not be available by default, in order to use, first you need to install pyspark by using pip or conda (if you are using anaconda) commands.

How to install pyspark in Jupyter notebook using findspark?

findspark library searches pyspark installation on the server and adds PySpark installation path to sys.path at runtime so that you can import PySpark modules. In order to use first, you need to Install findspark using pip command. Now run the below commands in sequence on Jupyter Notebook or in Python script.

What is the use of pyspark REPL?

Post successful installation of PySpark, use PySpark shell which is REPL (read–eval–print loop), and is used to start an interactive shell to test/run few individual PySpark commands. This is mostly used to quickly test some commands during the development time.


1 Answers

The reason is subprocess.Popen(job_module1.run_etl_01_job(spark, config)) is not a way to create a subprocess that is running job_module1.run_etl_01_job with arguments (spark, config)). What you are doing here is just running job_module1.run_etl_01_job(spark, config) locally (not in a different process) which returns None, then calling subprocess.Popen(None) which in turn gives you the error you posted.

What you probably meant to do is to use multiprocessing python module as in

from multiprocessing import Process

p1 = Process(target=job_module1.run_etl_01_job, args=(spark, config))
p2 = Process(target=job_module2.run_etl_02_job, args=(spark, config))
p1.start()
p2.start()
p1.join()
p2.join()

That said I don't think it's going to work since arguments to multiprocessing are pickled and I don't think SparkSession object will survive pickling/unpickling. You might also want to try use threading module here instead or even more convenient concurrent.futures module like so:

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as e:
  future1 = e.submit(job_module1.run_etl_01_job, spark, config)
  future2 = e.submit(job_module2.run_etl_02_job, spark, config)
  result1 = future1.result()
  result2 = future2.result()

This will run in one process and use multiple, but the actual Spark jobs will run in parallel (Spark remote execution releases Python's GIL similar to IO operations, though some steps might need it e.g. when final result in merged at Spark driver).

like image 123
Alexander Pivovarov Avatar answered Oct 31 '22 22:10

Alexander Pivovarov