Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AutoReconnectpymongo.pool in _raise_connection_failure

I use a python mongo driver to connect to the MongoDB. although It works properly for other tasks as well, after a while I've got below exception just for this task:

Additional Info:

  • The mongodb has three sharded cluster
  • The exception raised at bulk_write method
AutoReconnectpymongo.pool in _raise_connection_failure error
mongo-router-3:27017: [Errno 104] Connection reset by peer

Here is my code:

import mongoengine
from celery.schedules import crontab
from celery.task import Task, PeriodicTask
from pymongo import UpdateOne


def chunker(array, n=10000):
    for i in range(0, len(array), n):
        yield array[i:i + n]


class StatCalculator(Task):
    name = "StatCalculator"
    routing_key = 'my-routing-key'
    soft_time_limit = 3600 * 2
    time_limit = 3600 * 2
    max_retries = 1

    def __init__(self):
        self.col = None

        super(StatCalculator, self).__init__()

    def run(self, ids):
        self._initialize()
        self._process(ids)
        self._persist()
        self._finalize()

    def _initialize(self):
        self.col = mongoengine.connection._get_db().my_collection
        self.calculated_times = {}

    def _process(self, ids):
        """
        This method fetch the items and append them to `calculated_times`

        @param ids:
        @return:
        """
        pass

    def _persist(self):
        bulk_ops = []

        for key, stat in self.calculated_times.items():
            bulk_ops.append(UpdateOne(
                {'some_id': key['some_id'],
                 'created_at': self.now},
                {'$set': stat},
                upsert=True
            ))

        if bulk_ops:
            # raise AutoReconnectpymongo.pool in _raise_connection_failure
            # mongo-router:27017: [Errno 104] Connection reset by peer
            self.col.bulk_write(bulk_ops, ordered=False)

    def _finalize(self):
        pass


class PeriodicStatCalculator(PeriodicTask):
    name = "PeriodicStatCalculator"
    run_every = crontab(hour="3", minute="0")
    routing_key = 'my-routing-key'
    soft_time_limit = 3600 * 1
    time_limit = 3600 * 1
    max_retries = 1

    def run(self, *args, **kwargs):
        ids = []  # a list of some ids

        for chunk in chunker(ids):
            StatCalculator().delay(chunk)

Periodic tasks run every day and fetch some ids from a Postgres table, then another celery task will be called by periodic task for fetching, processing, and storing some information respectively to the passed ids.

finally, processed information will be stored in a Mongo collection by the bulk_write method

Environments:

  • django-rest-framework-mongoengine==3.4.1

  • mongoengine==0.20.0

  • pymongo==3.7.0

  • celery==4.3.0rc1

  • django-celery==3.3.1

Questions:

  • Why I've got this exception?
  • How I can overcome this exception?
like image 310
Phoenix Avatar asked Nov 07 '22 05:11

Phoenix


1 Answers

Could be any number of reasons:

  • You have an unreliable network between the application and the database. Move the application closer to the database, the database closer to the application or get a better service provider.
  • You are using an old driver with the higher TCP keepalive settings. Upgrade to most recent driver which uses lower settings.
  • The server runs out of resources and aborts the operation and/or closes the connection. Review server logs for clues.
like image 68
D. SM Avatar answered Nov 11 '22 17:11

D. SM