I have a celery task on Heroku that connects to an external API and retrieves some data, stores in the database and repeats several hundred times. Very quickly (after ~10 loops) Heroku starts warning about high memory usage. Any ideas?
tasks.py
@app.task
def retrieve_details():
for p in PObj.objects.filter(some_condition=True):
p.fetch()
models.py
def fetch(self):
v_data = self.service.getV(**dict(
Number=self.v.number
))
response = self.map_response(v_data)
for key in ["some_key","some_other_key",]:
setattr(self.v, key, response.get(key))
self.v.save()
Heroky logs
2017-01-01 10:26:25.634
132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)
Go to the log: https://api.heroku.com/myapps/[email protected]/addons/logentries
You are receiving this email because your Logentries alarm "Memory quota exceeded"
has been triggered.
In context:
2017-01-01 10:26:25.568 131 <45>1 2017-01-01T10:26:25.457354+00:00 heroku run.5891 - - Process running mem=595M(116.2%)
2017-01-01 10:26:25.634 132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)
You're basically loading a bunch of data into a Python dictionary in memory. This will cause a lot of memory overhead, especially if you are grabbing a lot of objects from the local database.
Do you really need to store all of these objects in a dictionary?
What most people do for things like this is:
This way, you only end up storing a single object in memory at any given time, thereby greatly reducing your memory footprint.
If I were you, I'd look for ways to move my logic into the database query, or simply process each item individually.
To expand on the veritable rdegges thoughts, here are two strategies I have used in the past when working with celery/python to help reduce the memory footprint: (1) kick off subtasks that each process exactly one object and/or (2) use generators.
kick off subtasks that each process exactly one object:
@app.task
def retrieve_details():
qs = PObj.objects.filter(some_condition=True)
for p in qs.values_list('id', flat=True):
do_fetch.delay(p)
@app.task
def do_fetch(n_id):
p = PObj.objects.get(id=n_id)
p.fetch()
Now you can tune celery so that it kills of processes after processing N number of PObj (tasks) to keep memory footprint low using --max-tasks-per-child
.
Using generators: you can also try this using generators so that you can (theoretically) throw away the PObj after you call fetch
def ps_of_interest(chunk=10):
n = chunk
start = 0
while n == chunk:
some_ps = list(PObj.objects.filter(some_condition=True)[start:start + n])
n = len(some_ps)
start += chunk
for p in some_ps:
yield p
@app.task
def retrieve_details():
for p in ps_of_interest():
p.fetch()
For my money, I’d go with option #1.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With