I am trying to run jupyter notebooks in parallel by starting them from another notebook. I'm using papermill to save the output from the notebooks.
In my scheduler.ipynb I’m using multiprocessing
which is what some people have had success with. I create processes from a base notebook and this seems to always work the 1st time it’s run. I can run 3 notebooks with sleep 10
in 13 seconds. If I have a subsequent cell that attempts to run the exact same thing, the processes that it spawns (multiple notebooks) hang indefinitely. I’ve tried adding code to make sure the spawned processes have exit codes and have completed, even calling terminate on them once they are done- no luck, my 2nd attempt never completes.
If I do:
sean@server:~$ ps aux | grep ipython
root 2129 0.1 0.2 1117652 176904 ? Ssl 19:39 0:05 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-eee374ff-0760-4490-8ed0-db03fed84f0c.json
root 3418 0.1 0.2 1042076 173652 ? Ssl 19:42 0:03 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-3e2f09e8-969f-41c9-81cc-acd2ec4e3d54.json
root 4332 0.1 0.2 1042796 174896 ? Ssl 19:44 0:04 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-bbd4575c-109a-4fb3-b6ed-372beb27effd.json
root 17183 0.2 0.2 995344 145872 ? Ssl 20:26 0:02 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-27c48eb1-16b4-4442-9574-058283e48536.json
I see that there appears to be 4 running kernels (4 processes). When I view the running notebooks, I see there are 6 running notebooks. This seems to be supported in the doc that a few kernels can service multiple notebooks. “A kernel process can be connected to more than one frontend simultaneously”
But, I suspect because ipython kernels continue to run, something bad is happening where spawned processes aren’t being reaped? Some say it’s not possible using multiprocessing. Others have described the same problem.
import re
import os
import multiprocessing
from os.path import isfile
from datetime import datetime
import papermill as pm
import nbformat
# avoid "RuntimeError: This event loop is already running"
# it seems papermill used to support this but it is now undocumented:
# papermill.execute_notebook(nest_asyncio=True)
import nest_asyncio
nest_asyncio.apply()
import company.config
# # Supporting Functions
# In[ ]:
def get_papermill_parameters(notebook,
notebook_prefix='/mnt/jupyter',
notebook_suffix='.ipynb'):
if isinstance(notebook, list):
notebook_path = notebook[0]
parameters = notebook[1]
tag = '_' + notebook[2] if notebook[2] is not None else None
else:
notebook_path = notebook
parameters = None
tag = ''
basename = os.path.basename(notebook_path)
dirpath = re.sub(basename + '$', '', notebook_path)
this_notebook_suffix = notebook_suffix if not re.search(notebook_suffix + '$', basename) else ''
input_notebook = notebook_prefix + notebook_path + this_notebook_suffix
scheduler_notebook_dir = notebook_prefix + dirpath + 'scheduler/'
if not os.path.exists(scheduler_notebook_dir):
os.makedirs(scheduler_notebook_dir)
output_notebook = scheduler_notebook_dir + basename
return input_notebook, output_notebook, this_notebook_suffix, parameters, tag
# In[ ]:
def add_additional_imports(input_notebook, output_notebook, current_datetime):
notebook_name = os.path.basename(output_notebook)
notebook_dir = re.sub(notebook_name, '', output_notebook)
temp_dir = notebook_dir + current_datetime + '/temp/'
results_dir = notebook_dir + current_datetime + '/'
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
if not os.path.exists(results_dir):
os.makedirs(results_dir)
updated_notebook = temp_dir + notebook_name
first_cell = nbformat.v4.new_code_cell("""
import import_ipynb
import sys
sys.path.append('/mnt/jupyter/lib')""")
metadata = {"kernelspec": {"display_name": "PySpark", "language": "python", "name": "pyspark"}}
existing_nb = nbformat.read(input_notebook, nbformat.current_nbformat)
cells = existing_nb.cells
cells.insert(0, first_cell)
new_nb = nbformat.v4.new_notebook(cells = cells, metadata = metadata)
nbformat.write(new_nb, updated_notebook, nbformat.current_nbformat)
output_notebook = results_dir + notebook_name
return updated_notebook, output_notebook
# In[ ]:
# define this function so it is easily passed to multiprocessing
def run_papermill(input_notebook, output_notebook, parameters):
pm.execute_notebook(input_notebook, output_notebook, parameters, log_output=True)
# # Run All of the Notebooks
# In[ ]:
def run(notebooks, run_hour_utc=10, scheduler=True, additional_imports=False,
parallel=False, notebook_prefix='/mnt/jupyter'):
"""
Run provided list of notebooks on a schedule or on demand.
Args:
notebooks (list): a list of notebooks to run
run_hour_utc (int): hour to run notebooks at
scheduler (boolean): when set to True (default value) notebooks will run at run_hour_utc.
when set to False notebooks will run on demand.
additional_imports (boolean): set to True if you need to add additional imports into your notebook
parallel (boolean): whether to run the notebooks in parallel
notebook_prefix (str): path to jupyter notebooks
"""
if not scheduler or datetime.now().hour == run_hour_utc: # Only run once a day on an hourly cron job.
now = datetime.today().strftime('%Y-%m-%d_%H%M%S')
procs = []
notebooks_base_url = company.config.cluster['resources']['daedalus']['notebook'] + '/notebooks'
if parallel and len(notebooks) > 10:
raise Exception("You are trying to run {len(notebooks)}. We recommend a maximum of 10 be run at once.")
for notebook in notebooks:
input_notebook, output_notebook, this_notebook_suffix, parameters, tag = get_papermill_parameters(notebook, notebook_prefix)
if is_interactive_notebook(input_notebook):
print(f"Not running Notebook '{input_notebook}' because it's marked interactive-only.")
continue
if additional_imports:
input_notebook, output_notebook = add_additional_imports(input_notebook, output_notebook, now)
else:
output_notebook = output_notebook + tag + '_' + now + this_notebook_suffix
print(f"Running Notebook: '{input_notebook}'")
print(" - Parameters: " + str(parameters))
print(f"Saving Results to: '{output_notebook}'")
print("Link: " + re.sub(notebook_prefix, notebooks_base_url, output_notebook))
if not os.path.isfile(input_notebook):
print(f"ERROR! Notebook file does not exist: '{input_notebook}'")
else:
try:
if parameters is not None:
parameters.update({'input_notebook':input_notebook, 'output_notebook':output_notebook})
if parallel:
# trailing comma in args is in documentation for multiprocessing- it seems to matter
proc = multiprocessing.Process(target=run_papermill, args=(input_notebook, output_notebook, parameters,))
print("starting process")
proc.start()
procs.append(proc)
else:
run_papermill(input_notebook, output_notebook, parameters)
except Exception as ex:
print(ex)
print(f"ERROR! See full error in: '{output_notebook}'\n\n")
if additional_imports:
temp_dir = re.sub(os.path.basename(input_notebook), '', input_notebook)
if os.path.exists(temp_dir):
os.system(f"rm -rf '{temp_dir}'")
if procs:
print("joining")
for proc in procs:
proc.join()
if procs:
print("terminating")
for proc in procs:
print(proc.is_alive())
print(proc.exitcode)
proc.terminate()
print(f"Done: Processed all {len(notebooks)} notebooks.")
else:
print(f"Waiting until {run_hour_utc}:00:00 UTC to run.")
I'm using python==3.6.12, papermill==2.2.2
jupyter core : 4.7.0
jupyter-notebook : 5.5.0
ipython : 7.16.1
ipykernel : 5.3.4
jupyter client : 6.1.7
ipywidgets : 7.2.1
Multiple notebooks and kernels In jupyter Lab, you can have multiple notebooks open at the same time and in the same browser window. Also, you can arrange your notebooks as you like which gives more flexibility.
Running a Jupyter Notebook from Another Jupyter NotebookFrom the left Sidebar, select and right-click on the Jupyter notebook that has to be run from another notebook. From the context menu, select Copy Path. Open the Jupyter notebook from which you want to run another notebook. Click Run.
Both ! and % allow you to run shell commands from a Jupyter notebook. % is provided by the IPython kernel and allows you to run "magic commands", many of which include well-known shell commands. ! , provided by Jupyter, allows shell commands to be run within cells.
Have you tried using the subprocess
module? It seems like a better option for you instead of multiprocessing. It allows you to asynchronously spawn sub-processes that will run in parallel, this can be used to invoke commands and programs as if you were using the shell. I find it really useful to write python scripts instead of bash scripts.
So you could use your main notebook to run your other notebooks as independent sub-processes in parallel with subprocesses.run(your_function_with_papermill)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With