Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can Python Observe Changes to Mongodb's Oplog

I have multiple Python scripts writing to Mongodb using pyMongo. How can another Python script observe changes to a Mongo query and perform some function when the change occurs? mongodb is setup with oplog enabled.

like image 571
Nyxynyx Avatar asked Jan 01 '14 17:01

Nyxynyx


People also ask

What happens to a replica set Oplog?

All replica set members contain a copy of the oplog, in the local.oplog.rs collection, which allows them to maintain the current state of the database. To facilitate replication, all replica set members send heartbeats (pings) to all other members. Any secondary member can import oplog entries from any other member.

How does MongoDB Oplog work?

Introduction to MongoDB OplogMongoDB uses a Transaction log internally just like many other databases. This is known as an Oplog in MongoDB. This serves as an internal log of every internal operation used for replication in a MongoDB Cluster. Each Replica Set, in a Sharded Cluster, has its own Oplog.

How use MongoDB with Python?

The first step to connect python to Atlas is MongoDB cluster setup. Next, create a file named pymongo_test_insert.py in any folder to write pymongo code. You can use any simple text editor like Textpad/Notepad. Use the connection_string to create the mongoclient and get the MongoDB database connection.

Which is the method to check the size of the Oplog for a given replica set member?

To check the size of the oplog for a given replica set member, connect to the member in mongosh and run the rs. printReplicationInfo() method. The oplog should be long enough to hold all transactions for the longest downtime you expect on a secondary.


3 Answers

I wrote a incremental backup tool for MongoDB some time ago, in Python. The tool monitors data changes by tailing the oplog. Here is the relevant part of the code.

Updated answer, MongDB 3.6+

As datdinhquoc cleverly points out in the comments below, for MongoDB 3.6 and up there are Change Streams.

Updated answer, pymongo 3

from time import sleep

from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect

# Time to wait for data or connection.
_SLEEP = 1.0

if __name__ == '__main__':
    oplog = MongoClient().local.oplog.rs
    stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']

    while True:
        kw = {}

        kw['filter'] = {'ts': {'$gt': stamp}}
        kw['cursor_type'] = CursorType.TAILABLE_AWAIT
        kw['oplog_replay'] = True

        cursor = oplog.find(**kw)

        try:
            while cursor.alive:
                for doc in cursor:
                    stamp = doc['ts']

                    print(doc)  # Do something with doc.

                sleep(_SLEEP)

        except AutoReconnect:
            sleep(_SLEEP)

Also see http://api.mongodb.com/python/current/examples/tailable.html.

Original answer, pymongo 2

from time import sleep

from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp

# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}

# Time to wait for data or connection.
_SLEEP = 10

if __name__ == '__main__':
    db = MongoClient().local

    while True:
        query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}}  # Replace with your query.
        cursor = db.oplog.rs.find(query, **_TAIL_OPTS)

        cursor.add_option(_QUERY_OPTIONS['oplog_replay'])

        try:
            while cursor.alive:
                try:
                    doc = next(cursor)

                    # Do something with doc.

                except (AutoReconnect, StopIteration):
                    sleep(_SLEEP)

        finally:
            cursor.close()
like image 117
Rafa Avatar answered Sep 25 '22 15:09

Rafa


I ran into this issue today and haven't found an updated answer anywhere.

The Cursor class has changed as of v3.0 and no longer accepts the tailable and await_data arguments. This example will tail the oplog and print the oplog record when it finds a record newer than the last one it found.

# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735
# to work with pymongo 3.0

import pymongo
from pymongo.cursor import CursorType

c = pymongo.MongoClient()

# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs

first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']

while True:
    cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
    while cursor.alive:
        for doc in cursor:
            ts = doc['ts']
            print doc
            # Work with doc here
like image 25
circuitBurn Avatar answered Sep 24 '22 15:09

circuitBurn


Query the oplog with a tailable cursor.

It is actually funny, because oplog-monitoring is exactly what the tailable-cursor feature was added for originally. I find it extremely useful for other things as well (e.g. implementing a mongodb-based pubsub, see this post for example), but that was the original purpose.

like image 27
shx2 Avatar answered Sep 25 '22 15:09

shx2