I have multiple Python scripts writing to Mongodb using pyMongo. How can another Python script observe changes to a Mongo query and perform some function when the change occurs? mongodb is setup with oplog enabled.
All replica set members contain a copy of the oplog, in the local.oplog.rs collection, which allows them to maintain the current state of the database. To facilitate replication, all replica set members send heartbeats (pings) to all other members. Any secondary member can import oplog entries from any other member.
Introduction to MongoDB OplogMongoDB uses a Transaction log internally just like many other databases. This is known as an Oplog in MongoDB. This serves as an internal log of every internal operation used for replication in a MongoDB Cluster. Each Replica Set, in a Sharded Cluster, has its own Oplog.
The first step to connect python to Atlas is MongoDB cluster setup. Next, create a file named pymongo_test_insert.py in any folder to write pymongo code. You can use any simple text editor like Textpad/Notepad. Use the connection_string to create the mongoclient and get the MongoDB database connection.
To check the size of the oplog for a given replica set member, connect to the member in mongosh and run the rs. printReplicationInfo() method. The oplog should be long enough to hold all transactions for the longest downtime you expect on a secondary.
I wrote a incremental backup tool for MongoDB some time ago, in Python. The tool monitors data changes by tailing the oplog
. Here is the relevant part of the code.
Updated answer, MongDB 3.6+
As datdinhquoc cleverly points out in the comments below, for MongoDB 3.6 and up there are Change Streams.
Updated answer, pymongo 3
from time import sleep
from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect
# Time to wait for data or connection.
_SLEEP = 1.0
if __name__ == '__main__':
oplog = MongoClient().local.oplog.rs
stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']
while True:
kw = {}
kw['filter'] = {'ts': {'$gt': stamp}}
kw['cursor_type'] = CursorType.TAILABLE_AWAIT
kw['oplog_replay'] = True
cursor = oplog.find(**kw)
try:
while cursor.alive:
for doc in cursor:
stamp = doc['ts']
print(doc) # Do something with doc.
sleep(_SLEEP)
except AutoReconnect:
sleep(_SLEEP)
Also see http://api.mongodb.com/python/current/examples/tailable.html.
Original answer, pymongo 2
from time import sleep
from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp
# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}
# Time to wait for data or connection.
_SLEEP = 10
if __name__ == '__main__':
db = MongoClient().local
while True:
query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}} # Replace with your query.
cursor = db.oplog.rs.find(query, **_TAIL_OPTS)
cursor.add_option(_QUERY_OPTIONS['oplog_replay'])
try:
while cursor.alive:
try:
doc = next(cursor)
# Do something with doc.
except (AutoReconnect, StopIteration):
sleep(_SLEEP)
finally:
cursor.close()
I ran into this issue today and haven't found an updated answer anywhere.
The Cursor class has changed as of v3.0 and no longer accepts the tailable
and await_data
arguments. This example will tail the oplog and print the oplog record when it finds a record newer than the last one it found.
# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735
# to work with pymongo 3.0
import pymongo
from pymongo.cursor import CursorType
c = pymongo.MongoClient()
# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs
first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']
while True:
cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
while cursor.alive:
for doc in cursor:
ts = doc['ts']
print doc
# Work with doc here
Query the oplog with a tailable cursor.
It is actually funny, because oplog-monitoring is exactly what the tailable-cursor feature was added for originally. I find it extremely useful for other things as well (e.g. implementing a mongodb-based pubsub, see this post for example), but that was the original purpose.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With