Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dump intermediate results of multiprocessing job to filesystem and continue with processing later on

I have a job that uses the multiprocessing package and calls a function via

resultList = pool.map(myFunction, myListOfInputParameters).

Each entry of the list of input parameters is independent from others.

This job will run a couple of hours. For safety reasons, I would like to store the results that are made in between in regular time intervals, like e.g. once an hour.

How can I do this and be able to continue with the processing when the job was aborted and I want to restart it based on the last available backup?

like image 594
user7468395 Avatar asked Jan 01 '19 13:01

user7468395


People also ask

How do you stop a multiprocessing process?

A process can be killed by calling the Process. terminate() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.

What is Chunksize in multiprocessing?

The “chunksize” is an argument specified in a function to the multiprocessing pool when issuing many tasks.

How does multiprocessing process work?

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

What is multiprocessing dummy?

dummy module module provides a wrapper for the multiprocessing module, except implemented using thread-based concurrency. It provides a drop-in replacement for multiprocessing, allowing a program that uses the multiprocessing API to switch to threads with a single change to import statements.


1 Answers

Perhaps use pickle. Read more here:

https://docs.python.org/3/library/pickle.html

Based on aws_apprentice's comment I created a full multiprocessing example in case you weren't sure how to use intermediate results. The first time this is run it will print "None" as there are no intermediate results. Run it again to simulate restarting.

from multiprocessing import Process
import pickle

def proc(name):
  data = None

  # Load intermediate results if they exist
  try:
    f = open(name+'.pkl', 'rb')
    data = pickle.load(f)
    f.close()
  except:
    pass

  # Do something
  print(data)
  data = "intermediate result for " + name

  # Periodically save your intermediate results
  f = open(name+'.pkl', 'wb')
  pickle.dump(data, f, -1)
  f.close()

processes = []
for x in range(5):
  p = Process(target=proc, args=("proc"+str(x),))
  p.daemon = True
  p.start()
  processes.append(p)

for process in processes:
  process.join()

for process in processes:
  process.terminate()

You can also use json if that makes sense to output intermediate results in human readable format. Or sqlite as a database if you need to push data into rows.

like image 51
MarkReedZ Avatar answered Oct 21 '22 03:10

MarkReedZ