Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is there no speed-up when using pythons multiprocessing for embarassingly parallel problem within a for-loop, with shared numpy data?

I want to speed up an embarassingly parallel problem related to Bayesian Inference. The aim is to infer coefficents u for a set of images x, given a matrix A, such that X = A*U. X has dimensions mxn, A mxp and U pxn. For each column of X, one has to infer the optimal corresponding column of the coefficients U. In the end, this information is used to update A. I use m = 3000, p = 1500 and n = 100. So, as it is a linear model, the inference of the coefficient-matrix u consists of n independent calculations. Thus, I tried to work with the multiprocessing module of Python, but there is no speed up.

Here is what I did:

The main structure, without parallelization, is:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

Implementation of multiprocessing:

I tried to implement multiprocessing. I have an 8-core machine that I can use.

  1. There are 3 for-loops. The only one that seems to be "parallelizable" is the third one, where the coefficients are inferred:
    • Generate a Queue and stack the iteration-numbers from 0 to batch_size-1 into the Queue
    • Generate 8 processes, and let them work through the Queue
  2. Share the data U using multiprocessing.Array

So, I replaced this third loop with the following:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

Here is the class Wrap_mp:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

And here is the function infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

The problem now are the following:

  1. The multiprocessing version is not faster than the single thread version for the given dimensions of the data.
  2. The process ID's increase with every iteration. Does this mean that there is constantly a new process generated? Doesn't this generate a huge overhead? How can I avoid that? Is there a possibility of creating within the whole for-loop 8 different processes and just update them with the data?
  3. Does the way I share the coefficients U amongst the processes slow the calculation down? Is there another, better way of doing this?
  4. Would a Pool of processes be better?

I am really thankful for any sort of help! I have started working with Python a month ago, and am pretty lost now.

Engin

like image 672
Engin Bumbacher Avatar asked Dec 07 '10 03:12

Engin Bumbacher


People also ask

Is multiprocessing faster Python?

This pattern is extremely common, and I illustrate it here with a toy stream processing application. On a machine with 48 physical cores, Ray is 6x faster than Python multiprocessing and 17x faster than single-threaded Python. Python multiprocessing doesn't outperform single-threaded Python on fewer than 24 cores.

Does Joblib use multiprocessing?

Old multiprocessing backendPrior to version 0.12, joblib used the 'multiprocessing' backend as default backend instead of 'loky' . This backend creates an instance of multiprocessing. Pool that forks the Python interpreter in multiple processes to execute each of the items of the list.

What is Python multiprocessing?

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

How do you parallelize a code in Python?

First, we create two Process objects and assign them the function they will execute when they start running, also known as the target function. Second, we tell the processes to go ahead and run their tasks. And third, we wait for the processes to finish running, then continue with our program.


1 Answers

Every time you create a Process you are creating a new process. If you're doing that within your for loop, then yes, you are starting new processes every time through the loop. It sounds like what you want to do is initialize your Queue and Processes outside of the loop, then fill the Queue inside the loop.

I've used multiprocessing.Pool before, and it's useful, but it doesn't offer much over what you've already implemented with a Queue.

like image 191
Ted Mielczarek Avatar answered Oct 30 '22 08:10

Ted Mielczarek