Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use boto3 client with Python multiprocessing?

Code looks something like this:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(s3):
    archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(s3)

The error:

_pickle.PicklingError: Can't pickle <class 'botocore.client.S3'>: attribute lookup S3 on botocore.client failed

I have very limited experience with the Python multiprocessing library. I'm not sure why, when the S3 client is not a parameter in any of the functions, it throws the above error. Note that the code is able to run okay if the archive file is loaded from disk, and not from S3.

Any help/guidance would be greatly appreciated.

like image 894
RNHTTR Avatar asked Jul 12 '18 16:07

RNHTTR


People also ask

How does Boto3 client work?

Boto3 generates the client from a JSON service definition file. The client's methods support every single type of interaction with the target AWS service. Resources, on the other hand, are generated from JSON resource definition files.

Should I use client or resource Boto3?

To summarize, resources are higher-level abstractions of AWS services compared to clients. Resources are the recommended pattern to use boto3 as you don't have to worry about a lot of the underlying details when interacting with AWS services. As a result, code written with Resources tends to be simpler.

Is Boto3 client Threadsafe?

boto3. client function is not thread-safe.

What is Boto3 client?

Boto3 client is a low-level service class to connect to AWS service. It provides similar methods available in the AWS API. All the methods available in the AWS API are available in the Boto3 client.


2 Answers

Well, I solved it in a pretty straightforward way. That is, using a more reduced a less complex object rather than . I used the class Bucket.

However, you should keeping into consideration the following post: Can't pickle when using multiprocessing Pool.map(). I put every object related with boto3 outside any class of function. Some other posts suggest to put s3 objects and functions inside the function you're trying to parallize in order to avoid overhead, I haven't tried yet, though. Indeed, I'll share to you a code in which is possible to save information into a msgpack filetype.

My code example is as follows (outside any class or function). Hope it helps.

import pandas as pd
import boto3
from pathos.pools import ProcessPool

s3 = boto3.resource('s3')
s3_bucket_name = 'bucket-name'
s3_bucket = s3.Bucket(s3_bucket_name)

def msgpack_dump_s3 (df, filename):
    try:
        s3_bucket.put_object(Body=df.to_msgpack(), Key=filename)
        print(module, filename + " successfully saved into s3 bucket '" + s3_bucket.name + "'")
    except Exception as e:
        # logging all the others as warning
        print(module, "Failed deleting bucket. Continuing. {}".format(e))

def msgpack_load_s3 (filename):
    try:
        return s3_bucket.Object(filename).get()['Body'].read()
    except ClientError as ex:
        if ex.response['Error']['Code'] == 'NoSuchKey':
            print(module, 'No object found - returning None')
            return None
        else:
            print(module, "Failed deleting bucket. Continuing. {}".format(ex))
            raise ex
    except Exception as e:
        # logging all the others as warning
        print(module, "Failed deleting bucket. Continuing. {}".format(e))
    return

def upper_function():

    def function_to_parallelize(filename):
        file = msgpack_load_s3(filename)
        if file is not None:
            df = pd.read_msgpack(file)
        #do somenthing

        print('\t\t\tSaving updated info...')
        msgpack_dump_s3(df, filename)


        pool = ProcessPool(nodes=ncpus)
        # do an asynchronous map, then get the results
        results = pool.imap(function_to_parallelize, files)
        print("...")
        print(list(results))
        """
        while not results.ready():
            time.sleep(5)
            print(".", end=' ')
like image 44
Pablo Andres Perez Quevedo Avatar answered Oct 08 '22 20:10

Pablo Andres Perez Quevedo


Objects passed to mp.starmap() must be pickle-able, and S3 clients are not pickle-able. Bringing the actions of the S3 client outside of the function that calls mp.starmap() can solve the issue:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant # Move the s3 call here, outside of the do() function

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(archive): # pass the previously loaded archive, and not the s3 object into the function
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(archive) # pass the previously loaded archive, and not the s3 object into the function
like image 171
RNHTTR Avatar answered Oct 08 '22 20:10

RNHTTR