I use a python mongo driver to connect to the MongoDB. although It works properly for other tasks as well, after a while I've got below exception just for this task:
Additional Info:
AutoReconnectpymongo.pool in _raise_connection_failure error
mongo-router-3:27017: [Errno 104] Connection reset by peer
Here is my code:
import mongoengine
from celery.schedules import crontab
from celery.task import Task, PeriodicTask
from pymongo import UpdateOne
def chunker(array, n=10000):
for i in range(0, len(array), n):
yield array[i:i + n]
class StatCalculator(Task):
name = "StatCalculator"
routing_key = 'my-routing-key'
soft_time_limit = 3600 * 2
time_limit = 3600 * 2
max_retries = 1
def __init__(self):
self.col = None
super(StatCalculator, self).__init__()
def run(self, ids):
self._initialize()
self._process(ids)
self._persist()
self._finalize()
def _initialize(self):
self.col = mongoengine.connection._get_db().my_collection
self.calculated_times = {}
def _process(self, ids):
"""
This method fetch the items and append them to `calculated_times`
@param ids:
@return:
"""
pass
def _persist(self):
bulk_ops = []
for key, stat in self.calculated_times.items():
bulk_ops.append(UpdateOne(
{'some_id': key['some_id'],
'created_at': self.now},
{'$set': stat},
upsert=True
))
if bulk_ops:
# raise AutoReconnectpymongo.pool in _raise_connection_failure
# mongo-router:27017: [Errno 104] Connection reset by peer
self.col.bulk_write(bulk_ops, ordered=False)
def _finalize(self):
pass
class PeriodicStatCalculator(PeriodicTask):
name = "PeriodicStatCalculator"
run_every = crontab(hour="3", minute="0")
routing_key = 'my-routing-key'
soft_time_limit = 3600 * 1
time_limit = 3600 * 1
max_retries = 1
def run(self, *args, **kwargs):
ids = [] # a list of some ids
for chunk in chunker(ids):
StatCalculator().delay(chunk)
Periodic tasks run every day and fetch some ids from a Postgres table, then another celery task will be called by periodic task for fetching, processing, and storing some information respectively to the passed ids.
finally, processed information will be stored in a Mongo collection by the bulk_write method
Environments:
django-rest-framework-mongoengine==3.4.1
mongoengine==0.20.0
pymongo==3.7.0
celery==4.3.0rc1
django-celery==3.3.1
Questions:
Could be any number of reasons:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With