I'm trying to call a list of pyspark modules dynamically from one main.py python script, using Import module and subprocess. The child modules I'm trying to call does not return anything, it just does its ETL operation. I want my main.py program to wait until the child process completes. In the code below, everytime I'm trying to call the child process, I end up with the error "TypeError: 'NoneType' object is not iterable". One other problem is, after initiating the subprocess.Popen, I thought the flow will keep continuing in the main.py to next line, until it hits the j1.wait(), but the immediate print statment (print("etl_01_job is running") is not executing, Am I missing anything?
I googled and tried a lot of other ways, but nothing is working. can anyone shed some light on what am I doing wrong? Once I'm able to successfully call the child process, I have to add few other conditions based on the return code of the child proces. But at this point, I want to fix this issue. Thanks
main.py
import json
import importlib
import subprocess
from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
def main():
with open('C:/Pyspark/test/config/config.json', 'r') as config_file:
config = json.load(config_file)
spark = SparkSession.builder\
.appName(config.get("app_name"))\
.getOrCreate()
job_module1 = importlib.import_module("etl_01_job")
print("main calling time :", datetime.now())
j1 = subprocess.Popen(job_module1.run_etl_01_job(spark, config))
print("etl_01_job is running")
j1.wait() #I'm expecting the main.py to wait until child process finishes
print("etl_01_job finished")
job_module2 = importlib.import_module("etl_02_job")
j2 = subprocess.Popen(job_module2.run_etl_02_job(spark, config))
if __name__ == "__main__":
main()
Child pyspark job:etl_01_job.py : Not the original code, just a sample script
from datetime import datetime
import time
import sys
def etl_01_job(spark, config):
print("I'm in 01etljob")
print(config)
print(config.get("app_name"))
time.sleep(10)
print("etljob 1 ending time :", datetime.now())
def run_etl_01_job(spark, config):
etl_01_job(spark, config)
The error I'm getting is
Traceback (most recent call last):
File "C:/py_spark/src/main.py", line 49, in <module>
main()
File "C:/py_spark/src/main.py", line 38, in main
p1 = subprocess.run(job_module1.run_etl_01_job(spark, config))
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 489, in run
with Popen(*popenargs, **kwargs) as process:
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 854, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 1247, in _execute_child
args = list2cmdline(args)
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 549, in list2cmdline
for arg in map(os.fsdecode, seq):
TypeError: 'NoneType' object is not iterable
Even after installing PySpark you are getting “ No module named pyspark" in Python, this could be due to environment variables issues, you can solve this by installing and import findspark. findspark library searches pyspark installation on the server and adds PySpark installation path to sys.path at runtime so that you can import PySpark modules.
1. Install PySpark to resolve No module named ‘pyspark’ Error Note that PySpark doesn’t come with Python installation hence it will not be available by default, in order to use, first you need to install pyspark by using pip or conda (if you are using anaconda) commands.
findspark library searches pyspark installation on the server and adds PySpark installation path to sys.path at runtime so that you can import PySpark modules. In order to use first, you need to Install findspark using pip command. Now run the below commands in sequence on Jupyter Notebook or in Python script.
Post successful installation of PySpark, use PySpark shell which is REPL (read–eval–print loop), and is used to start an interactive shell to test/run few individual PySpark commands. This is mostly used to quickly test some commands during the development time.
The reason is subprocess.Popen(job_module1.run_etl_01_job(spark, config))
is not a way to create a subprocess that is running job_module1.run_etl_01_job
with arguments (spark, config))
. What you are doing here is just running job_module1.run_etl_01_job(spark, config)
locally (not in a different process) which returns None
, then calling subprocess.Popen(None)
which in turn gives you the error you posted.
What you probably meant to do is to use multiprocessing
python module as in
from multiprocessing import Process
p1 = Process(target=job_module1.run_etl_01_job, args=(spark, config))
p2 = Process(target=job_module2.run_etl_02_job, args=(spark, config))
p1.start()
p2.start()
p1.join()
p2.join()
That said I don't think it's going to work since arguments to multiprocessing
are pickled and I don't think SparkSession object will survive pickling/unpickling. You might also want to try use threading
module here instead or even more convenient concurrent.futures
module like so:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as e:
future1 = e.submit(job_module1.run_etl_01_job, spark, config)
future2 = e.submit(job_module2.run_etl_02_job, spark, config)
result1 = future1.result()
result2 = future2.result()
This will run in one process and use multiple, but the actual Spark jobs will run in parallel (Spark remote execution releases Python's GIL similar to IO operations, though some steps might need it e.g. when final result in merged at Spark driver).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With