Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Python Multiprocessing Queue Inside AWS Lambda Function

I have some python that creates multiple processes to complete a task much quicker. When I create these processes I pass in a queue. Inside the processes I use queue.put(data) so I am able to retrieve the data outside of the processes. It works fantastic on my local machine, but when I upload the zip to an AWS Lambda function (Python 3.8) it says the Queue() function has not been implemented.The project runs great in the AWS Lambda when I simply take out the queue functionality so I know this is the only hang up I currently have.

I ensured to install the multiprocessing package directly to my python project by using "pip install multiprocess -t ./" as well as "pip install boto3 -t ./".

I am new to python specifically as well as AWS but the research I have come across recently potentially points we to SQS.

Reading over these SQS docs I am not sure if this is exactly what I am looking for.

Here is the code I am running in the Lambda that works locally but not on AWS. See the *'s for important pieces:

from multiprocessing import Process, Queue
from craigslist import CraigslistForSale
import time
import math

sitesHold = ["sfbay", "seattle", "newyork", "(many more)..." ]

results = []


def f(sites, category, search_keys, queue):
    local_results = []
    for site in sites:
        cl_fs = CraigslistForSale(site=site, category=category, filters={'query': search_keys})
        for result in cl_fs.get_results(sort_by='newest'):
            local_results.append(result)
    if len(local_results) > 0:
        print(local_results)
    queue.put(local_results) # Putting data *********************************


def scan_handler(event, context):
    started_at = time.monotonic()
    queue = Queue()
    print("Running...")
    amount_of_lists = int(event['amountOfLists'])
    list_length = int(len(sitesHold) / amount_of_lists)
    extra_lists = math.ceil((len(sitesHold) - (amount_of_lists * list_length)) / list_length)
    site_list = []
    list_creator_counter = 0
    site_counter = 0
    for i in range(amount_of_lists + extra_lists):
        site_list.append(sitesHold[list_creator_counter:list_creator_counter + list_length])
        list_creator_counter += list_length
    processes = []
    for i in range(len(site_list)):
        site_counter = site_counter + len(site_list[i])
        processes.append(Process(target=f, args=(site_list[i], event['category'], event['searchQuery'], queue,))) # Creating processes and creating queues ***************************

    for process in processes:
        process.start() # Starting processes ***********************

    for process in processes:
        listings = queue.get() # Getting from queue ****************************
        if len(listings) > 0:
            for listing in listings:
                results.append(listing)

    print(f"Results: {results}")

    for process in processes:
        process.join()

    total_time_took = time.monotonic() - started_at
    print(f"Sites processed: {site_counter}")
    print(f'Took {total_time_took} seconds long')

This is the error the Lambda function is giving me:

{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/main.py\", line 90, in scan_handler\n    queue = Queue()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 103, in Queue\n    return Queue(maxsize, ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/queues.py\", line 42, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
  ]
}

Does Queue() work in an AWS Lambda? What is the best way to accomplish my goal?

like image 870
David La Grange Avatar asked Jan 08 '20 00:01

David La Grange


2 Answers

doesn't look like it's supported -

https://blog.ruanbekker.com/blog/2019/02/19/parallel-processing-on-aws-lambda-with-python-using-multiprocessing/

like image 135
David Avatar answered Nov 11 '22 08:11

David


From the AWS docs

If you develop a Lambda function with Python, parallelism doesn’t come by default. Lambda supports Python 2.7 and Python 3.6, both of which have multiprocessing and threading modules.

The multiprocessing module that comes with Python lets you run multiple processes in parallel. Due to the Lambda execution environment not having /dev/shm (shared memory for processes) support, you can’t use multiprocessing.Queue or multiprocessing.Pool.

On the other hand, you can use multiprocessing.Pipe instead of multiprocessing.Queue to accomplish what you need without getting any errors during the execution of the Lambda function.

like image 34
DineshKumar Avatar answered Nov 11 '22 07:11

DineshKumar