Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boto3 cannot create client on pyspark worker?

I'm trying to send data from the workers of a Pyspark RDD to an SQS queue, using boto3 to talk with AWS. I need to send data directly from the partitions, rather than collecting the RDD and sending data from the driver.

I am able to send messages to SQS via boto3 locally & from the Spark driver; also, I can import boto3 and create a boto3 session on the partitions. However when I try to create a client or resource from the partitions I receive an error. I believe boto3 is not correctly creating a client, but I'm not entirely sure on that point. My code looks like this:

def get_client(x):   #the x is required to use pyspark's mapPartitions
    import boto3
    client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
    return x

rdd_with_client = rdd.mapPartitions(get_client)

The error:

DataNotFoundError: Unable to load data for: endpoints

The longer traceback:

File "<stdin>", line 4, in get_client
  File "./rebuilt.zip/boto3/session.py", line 250, in client
    aws_session_token=aws_session_token, config=config)
  File "./rebuilt.zip/botocore/session.py", line 810, in create_client
    endpoint_resolver = self.get_component('endpoint_resolver')
  File "./rebuilt.zip/botocore/session.py", line 691, in get_component
    return self._components.get_component(name)
  File "./rebuilt.zip/botocore/session.py", line 872, in get_component
    self._components[name] = factory()
  File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver
    endpoints = loader.load_data('endpoints')
  File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper
    data = func(self, *args, **kwargs)
  File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data
    raise DataNotFoundError(data_path=name)
DataNotFoundError: Unable to load data for: endpoints

I've also tried modifying my function to create a resource instead of the explicit client, to see if it could find & use the default client setup. In that case, my code is:

def get_resource(x):
    import boto3
    sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
    return x

rdd_with_client = rdd.mapPartitions(get_resource)

I receive an error pointing to a has_low_level_client parameter, which is triggered because the client doesn't exist; the traceback says:

File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func
  File "<stdin>", line 4, in session_resource
  File "./rebuilt.zip/boto3/session.py", line 329, in resource
    has_low_level_client)
ResourceNotExistsError: The 'sqs' resource does not exist.
The available resources are:
   -

No resources available because, I think, there's no client to house them.

I've been banging my head against this one for a few days now. Any help appreciated!

like image 318
EmmaOnThursday Avatar asked Jun 21 '16 17:06

EmmaOnThursday


1 Answers

This is because you have the boto3 bundle as a zip file.

"./rebuilt.zip/boto3"

What boto3 does for initialisation is it will download a bunch files and save it inside the distribution folder. Because your boto3 lives in a zip package, so obviously those files won't be able to it to there.

Solution is, rather then distribute boto3 inside a zip, you should have boto3 installed on your Spark environment. Be careful here, you might want to install boto3 both on the master node and worker nodes, depends on how you implement your app. Safe bet is install on both.

If you are using EMR, you can use bootstrap step to do it.

like image 114
Tom Tang Avatar answered Sep 27 '22 19:09

Tom Tang