Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hung cells: running multiple jupyter notebooks in parallel with papermill

I am trying to run jupyter notebooks in parallel by starting them from another notebook. I'm using papermill to save the output from the notebooks.

In my scheduler.ipynb I’m using multiprocessing which is what some people have had success with. I create processes from a base notebook and this seems to always work the 1st time it’s run. I can run 3 notebooks with sleep 10 in 13 seconds. If I have a subsequent cell that attempts to run the exact same thing, the processes that it spawns (multiple notebooks) hang indefinitely. I’ve tried adding code to make sure the spawned processes have exit codes and have completed, even calling terminate on them once they are done- no luck, my 2nd attempt never completes.

If I do:

sean@server:~$ ps aux | grep ipython 
root      2129  0.1  0.2 1117652 176904 ?      Ssl  19:39   0:05 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-eee374ff-0760-4490-8ed0-db03fed84f0c.json
root      3418  0.1  0.2 1042076 173652 ?      Ssl  19:42   0:03 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-3e2f09e8-969f-41c9-81cc-acd2ec4e3d54.json
root      4332  0.1  0.2 1042796 174896 ?      Ssl  19:44   0:04 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-bbd4575c-109a-4fb3-b6ed-372beb27effd.json
root     17183  0.2  0.2 995344 145872 ?       Ssl  20:26   0:02 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-27c48eb1-16b4-4442-9574-058283e48536.json

I see that there appears to be 4 running kernels (4 processes). When I view the running notebooks, I see there are 6 running notebooks. This seems to be supported in the doc that a few kernels can service multiple notebooks. “A kernel process can be connected to more than one frontend simultaneously”

But, I suspect because ipython kernels continue to run, something bad is happening where spawned processes aren’t being reaped? Some say it’s not possible using multiprocessing. Others have described the same problem.

import re
import os
import multiprocessing

from os.path import isfile
from datetime import datetime

import papermill as pm
import nbformat

# avoid "RuntimeError: This event loop is already running"
# it seems papermill used to support this but it is now undocumented: 
#  papermill.execute_notebook(nest_asyncio=True)
import nest_asyncio
nest_asyncio.apply()

import company.config


# # Supporting Functions

# In[ ]:


def get_papermill_parameters(notebook,
                             notebook_prefix='/mnt/jupyter',
                             notebook_suffix='.ipynb'):
  if isinstance(notebook, list):
    notebook_path = notebook[0]
    parameters = notebook[1]
    tag = '_' + notebook[2] if notebook[2] is not None else None
  else:
    notebook_path = notebook
    parameters = None
    tag = ''
    
  basename = os.path.basename(notebook_path)
  dirpath = re.sub(basename + '$', '', notebook_path)
  this_notebook_suffix = notebook_suffix if not re.search(notebook_suffix + '$', basename) else ''
  
  input_notebook = notebook_prefix + notebook_path + this_notebook_suffix
  scheduler_notebook_dir = notebook_prefix + dirpath + 'scheduler/'
  
  if not os.path.exists(scheduler_notebook_dir):
    os.makedirs(scheduler_notebook_dir)
    
  output_notebook = scheduler_notebook_dir + basename 
  
  return input_notebook, output_notebook, this_notebook_suffix, parameters, tag


# In[ ]:


def add_additional_imports(input_notebook, output_notebook, current_datetime):          
  notebook_name = os.path.basename(output_notebook) 
  notebook_dir = re.sub(notebook_name, '', output_notebook)
  temp_dir = notebook_dir + current_datetime + '/temp/'
  results_dir = notebook_dir + current_datetime + '/'
  
  if not os.path.exists(temp_dir):
    os.makedirs(temp_dir)
  if not os.path.exists(results_dir):
    os.makedirs(results_dir) 
    
  updated_notebook = temp_dir + notebook_name 
  first_cell = nbformat.v4.new_code_cell("""
    import import_ipynb
    import sys
    sys.path.append('/mnt/jupyter/lib')""")
        
  metadata = {"kernelspec": {"display_name": "PySpark", "language": "python", "name": "pyspark"}}
  existing_nb = nbformat.read(input_notebook, nbformat.current_nbformat)
  cells = existing_nb.cells
  cells.insert(0, first_cell)
  new_nb = nbformat.v4.new_notebook(cells = cells, metadata = metadata)
  nbformat.write(new_nb, updated_notebook, nbformat.current_nbformat)
  output_notebook = results_dir + notebook_name
  
  return updated_notebook, output_notebook


# In[ ]:


# define this function so it is easily passed to multiprocessing
def run_papermill(input_notebook, output_notebook, parameters):
  pm.execute_notebook(input_notebook, output_notebook, parameters, log_output=True)


# # Run All of the Notebooks

# In[ ]:


def run(notebooks, run_hour_utc=10, scheduler=True, additional_imports=False,
        parallel=False, notebook_prefix='/mnt/jupyter'):
  """
  Run provided list of notebooks on a schedule or on demand.

  Args:
    notebooks (list): a list of notebooks to run
    run_hour_utc (int): hour to run notebooks at
    scheduler (boolean): when set to True (default value) notebooks will run at run_hour_utc.
                         when set to False notebooks will run on demand.
    additional_imports (boolean): set to True if you need to add additional imports into your notebook
    parallel (boolean): whether to run the notebooks in parallel
    notebook_prefix (str): path to jupyter notebooks
  """
  if not scheduler or datetime.now().hour == run_hour_utc:  # Only run once a day on an hourly cron job.
    now = datetime.today().strftime('%Y-%m-%d_%H%M%S')
    procs = []
    notebooks_base_url = company.config.cluster['resources']['daedalus']['notebook'] + '/notebooks'
    
    if parallel and len(notebooks) > 10:
      raise Exception("You are trying to run {len(notebooks)}. We recommend a maximum of 10 be run at once.")

    for notebook in notebooks:
      input_notebook, output_notebook, this_notebook_suffix, parameters, tag = get_papermill_parameters(notebook, notebook_prefix)
      
      if is_interactive_notebook(input_notebook):
        print(f"Not running Notebook '{input_notebook}' because it's marked interactive-only.")
        continue
      
      if additional_imports:
        input_notebook, output_notebook = add_additional_imports(input_notebook, output_notebook, now)
      else:
        output_notebook = output_notebook + tag + '_' + now + this_notebook_suffix
      
      print(f"Running Notebook: '{input_notebook}'")
      print(" - Parameters: " + str(parameters))
      print(f"Saving Results to: '{output_notebook}'")
      print("Link: " + re.sub(notebook_prefix, notebooks_base_url, output_notebook))
    
      if not os.path.isfile(input_notebook):
        print(f"ERROR! Notebook file does not exist: '{input_notebook}'")
      else:
        try:
          if parameters is not None:
            parameters.update({'input_notebook':input_notebook, 'output_notebook':output_notebook})
          if parallel:
            # trailing comma in args is in documentation for multiprocessing- it seems to matter
            proc = multiprocessing.Process(target=run_papermill, args=(input_notebook, output_notebook, parameters,))
            print("starting process")
            proc.start()
            procs.append(proc)
            
          else:
            run_papermill(input_notebook, output_notebook, parameters)
            
        except Exception as ex:
          print(ex)
          print(f"ERROR! See full error in: '{output_notebook}'\n\n")
          
      if additional_imports:
        temp_dir = re.sub(os.path.basename(input_notebook), '', input_notebook)
        if os.path.exists(temp_dir):
          os.system(f"rm -rf '{temp_dir}'")
    
    if procs:
      print("joining")
      for proc in procs:
        proc.join()
    
    if procs:
      print("terminating")
      for proc in procs:
        print(proc.is_alive())
        print(proc.exitcode)
        proc.terminate()
    
    print(f"Done: Processed all {len(notebooks)} notebooks.")
    
  else:
    print(f"Waiting until {run_hour_utc}:00:00 UTC to run.")

I'm using python==3.6.12, papermill==2.2.2

jupyter core     : 4.7.0
jupyter-notebook : 5.5.0
ipython          : 7.16.1
ipykernel        : 5.3.4
jupyter client   : 6.1.7
ipywidgets       : 7.2.1
like image 511
sean5446 Avatar asked Jan 05 '21 00:01

sean5446


People also ask

Can I run multiple Jupyter notebooks simultaneously?

Multiple notebooks and kernels In jupyter Lab, you can have multiple notebooks open at the same time and in the same browser window. Also, you can arrange your notebooks as you like which gives more flexibility.

How do you link two Jupyter notebooks?

Running a Jupyter Notebook from Another Jupyter NotebookFrom the left Sidebar, select and right-click on the Jupyter notebook that has to be run from another notebook. From the context menu, select Copy Path. Open the Jupyter notebook from which you want to run another notebook. Click Run.

What does %% do in Jupyter notebook?

Both ! and % allow you to run shell commands from a Jupyter notebook. % is provided by the IPython kernel and allows you to run "magic commands", many of which include well-known shell commands. ! , provided by Jupyter, allows shell commands to be run within cells.


1 Answers

Have you tried using the subprocess module? It seems like a better option for you instead of multiprocessing. It allows you to asynchronously spawn sub-processes that will run in parallel, this can be used to invoke commands and programs as if you were using the shell. I find it really useful to write python scripts instead of bash scripts.

So you could use your main notebook to run your other notebooks as independent sub-processes in parallel with subprocesses.run(your_function_with_papermill).

like image 136
barzilay Avatar answered Oct 25 '22 18:10

barzilay