Python on AWS Lambda does not support multiprocessing.Pool.map()
, as documented in this other question. Please note that the other question was asking why it doesn't work. This question is different, I'm asking how to emulate the functionality given the lack of underlying support.
One of the answers to that other question gave us this code:
# Python 3.6
from multiprocessing import Pipe, Process
def myWorkFunc(data, connection):
result = None
# Do some work and store it in result
if result:
connection.send([result])
else:
connection.send([None])
def myPipedMultiProcessFunc():
# Get number of available logical cores
plimit = multiprocessing.cpu_count()
# Setup management variables
results = []
parent_conns = []
processes = []
pcount = 0
pactive = []
i = 0
for data in iterable:
# Create the pipe for parent-child process communication
parent_conn, child_conn = Pipe()
# create the process, pass data to be operated on and connection
process = Process(target=myWorkFunc, args=(data, child_conn,))
parent_conns.append(parent_conn)
process.start()
pcount += 1
if pcount == plimit: # There is not currently room for another process
# Wait until there are results in the Pipes
finishedConns = multiprocessing.connection.wait(parent_conns)
# Collect the results and remove the connection as processing
# the connection again will lead to errors
for conn in finishedConns:
results.append(conn.recv()[0])
parent_conns.remove(conn)
# Decrement pcount so we can add a new process
pcount -= 1
# Ensure all remaining active processes have their results collected
for conn in parent_conns:
results.append(conn.recv()[0])
conn.close()
# Process results as needed
Can this sample code be modified to support multiprocessing.Pool.map()
?
What have I tried so far
I analysed the above code and I do not see a parameter for the function to be executed or the data, so I'm inferring that it does not perform the same function as multiprocessing.Pool.map()
. It is not clear what the code does, other than demonstrating the building blocks that could be assembled into a solution.
Is this a "write my code for me" question?
Yes to some extent, it is. This issue impacts thousands of Python developers, and it would be far more efficient for the world economy, less green-house gas emissions, etc if all of us share the same code, instead of forcing every SO user who encounters this to go and develop their own workaround. I hope I've done my part by distilling this into a clear question with the presumed building blocks ready to go.
Due to the Lambda execution environment not having /dev/shm (shared- memory for processes) support, you can't use multiprocessing.
Using multithreading in AWS Lambda can speed up your Lambda execution and reduce cost as Lambda charges in 100 ms unit.
It works like a map-reduce architecture. It maps the input to the different processors and collects the output from all the processors. After the execution of code, it returns the output in form of a list or array. It waits for all the tasks to finish and then returns the output.
Python packages that contain compiled code (for example: NumPy and pandas) aren't always compatible with Lambda runtimes by default. If you install these packages using pip, then the packages download and compile a module-name package for the architecture of the local machine.
I was able to get this working for my own tests. I've based my code on this link : https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/
NB1: you MUST increase memory allocation to the lambda function. with the default minimal amount, there's no increase in performance with multiprocessing. With the maximum my account can allocate (3008MB) the figures below were attained.
NB2: I'm completely ignoring max processes in parallel here. My usage doesn't have a whole lot of elements to work on.
with the code below, usage is:
work = funcmap(yourfunction,listofstufftoworkon)
yourresults = work.run()
running from my laptop:
jumper@jumperdebian[3333] ~/scripts/tmp 2019-09-04 11:52:30
└─ $ ∙ python3 -c "import tst; tst.lambda_handler(None,None)"
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 9.574460506439209
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 6.422513484954834
running from aws:
Function Logs:
START RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97 Version: $LATEST
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 12.135798215866089
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 7.293526887893677
END RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97
Here's the test code:
import time
from multiprocessing import Process, Pipe
import boto3
class funcmap(object):
fmfunction=None
fmlist=None
def __init__(self,pfunction,plist):
self.fmfunction=pfunction
self.fmlist=plist
def calculation(self, pfunction, pload, conn):
panswer=pfunction(pload)
conn.send([pload,panswer])
conn.close()
def run(self):
datalist = self.fmlist
processes = []
parent_connections = []
for datum in datalist:
parent_conn, child_conn = Pipe()
parent_connections.append(parent_conn)
process = Process(target=self.calculation, args=(self.fmfunction, datum, child_conn,))
processes.append(process)
pstart=time.time()
for process in processes:
process.start()
#print("starting at t+ {} s".format(time.time()-pstart))
for process in processes:
process.join()
#print("joining at t+ {} s".format(time.time()-pstart))
results = []
for parent_connection in parent_connections:
resp=parent_connection.recv()
results.append((resp[0],resp[1]))
return results
def fibo(n):
if n <= 2 : return 1
return fibo(n-1)+fibo(n-2)
def lambda_handler(event, context):
#worklist=[22,23,24,25,26,27,28,29,30,31,32,31,30,29,28,27,26,27,28,29]
#worklist=[22,23,24,25,26,27,28,29,30]
worklist=[30,30,30,30]
#worklist=[30]
_start = time.time()
results=[]
for a in worklist:
results.append((a,fibo(a)))
print("results : {}".format(results))
_end = time.time()
print("SP runtime : {}".format(_end-_start))
_mstart = time.time()
work = funcmap(fibo,worklist)
results = work.run()
print("results : {}".format(results))
_mend = time.time()
print("MP runtime : {}".format(_mend-_mstart))
hope it helps.
I had the same issue, and ended up implementing my own simple wrapper around multiprocessing.Pool
. Definitely not bullet proof, but enough for simple use cases as drop-in replacement.
https://stackoverflow.com/a/63633248/158049
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With