I'm trying to send data from the workers of a Pyspark RDD to an SQS queue, using boto3 to talk with AWS. I need to send data directly from the partitions, rather than collecting the RDD and sending data from the driver.
I am able to send messages to SQS via boto3 locally & from the Spark driver; also, I can import boto3 and create a boto3 session on the partitions. However when I try to create a client or resource from the partitions I receive an error. I believe boto3 is not correctly creating a client, but I'm not entirely sure on that point. My code looks like this:
def get_client(x): #the x is required to use pyspark's mapPartitions
import boto3
client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_client)
The error:
DataNotFoundError: Unable to load data for: endpoints
The longer traceback:
File "<stdin>", line 4, in get_client
File "./rebuilt.zip/boto3/session.py", line 250, in client
aws_session_token=aws_session_token, config=config)
File "./rebuilt.zip/botocore/session.py", line 810, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "./rebuilt.zip/botocore/session.py", line 691, in get_component
return self._components.get_component(name)
File "./rebuilt.zip/botocore/session.py", line 872, in get_component
self._components[name] = factory()
File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver
endpoints = loader.load_data('endpoints')
File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper
data = func(self, *args, **kwargs)
File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data
raise DataNotFoundError(data_path=name)
DataNotFoundError: Unable to load data for: endpoints
I've also tried modifying my function to create a resource instead of the explicit client, to see if it could find & use the default client setup. In that case, my code is:
def get_resource(x):
import boto3
sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_resource)
I receive an error pointing to a has_low_level_client parameter, which is triggered because the client doesn't exist; the traceback says:
File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func
File "<stdin>", line 4, in session_resource
File "./rebuilt.zip/boto3/session.py", line 329, in resource
has_low_level_client)
ResourceNotExistsError: The 'sqs' resource does not exist.
The available resources are:
-
No resources available because, I think, there's no client to house them.
I've been banging my head against this one for a few days now. Any help appreciated!
This is because you have the boto3 bundle as a zip file.
"./rebuilt.zip/boto3"
What boto3 does for initialisation is it will download a bunch files and save it inside the distribution folder. Because your boto3 lives in a zip package, so obviously those files won't be able to it to there.
Solution is, rather then distribute boto3 inside a zip, you should have boto3 installed on your Spark environment. Be careful here, you might want to install boto3 both on the master node and worker nodes, depends on how you implement your app. Safe bet is install on both.
If you are using EMR, you can use bootstrap step to do it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With