I have a celery worker running on Elastic Beanstalk that polls a SQS queue, gets messages (containing S3 file names), downloads those files from S3 and processes them. My worker is scheduled to run at every 15 seconds but due to some reason the memory usage keeps on increasing with time.
This is the code I'm using to access SQS
def get_messages_from_sqs(queue_url, queue_region="us-west-2", number_of_messages=1):
client = boto3.client('sqs', region_name=queue_region)
sqs_response = client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=number_of_messages)
messages = sqs_response.get("Messages", [])
cleaned_messages = []
for message in messages:
body = json.loads(message["Body"])
data = body["Records"][0]
data["receipt_handle"] = message["ReceiptHandle"]
cleaned_messages.append(data)
return cleaned_messages
def download_file_from_s3(bucket_name, filename):
s3_client = boto3.client('s3')
s3_client.download_file(bucket_name, filename, '/tmp/{}'.format(filename))
Do we need to close client connection in boto3 after we're done with the process ? If so, how can we do it ?
I have run into similar issues using Celery in production, completely unrelated to Boto. Although I do not have an explanation for the memory leak (this would take some serious code spelunking and profiling), I can offer a potential workaround if your goal is just to not run out of memory.
Setting max tasks per child should allow you to constantly reclaim the memory as it is released by the killed process.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With