I am trying to implement a pub/sub on mongo's oplog collection. Provided code works, without tailable = True
option set (it will return all docs), but as soon as I pass it to the cursor it won't pickup anything (even after making changes in the desired collection).
I am using pymongo 2.7.2
while(True):
with self.database.connect() as connection:
cursor = connection['local'].oplog.rs.find(
{'ns': self.collection},
await_data = True,
tailable = True
)
cursor.add_option(_QUERY_OPTIONS['oplog_replay'])
while cursor.alive:
try:
doc = cursor.next()
print doc
except(AutoReconnect, StopIteration):
time.sleep(1)
I have tried few solutions, but it still fails as soon as tailable option is added. Oplog is set up properly, since mongo-oplog
module from nodejs works as expected.
Possible duplicate (no accepted answer)
You need to query on the 'ts' oplog field, and keep track of the last document you read (through the timestamp) in case the Cursor has to be recreated. Here's an example you can modify to suit your needs:
import time
import pymongo
c = pymongo.MongoClient()
# Uncomment this for master/slave.
# oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
oplog = c.local.oplog.rs
first = oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1).next()
ts = first['ts']
while True:
cursor = oplog.find({'ts': {'$gt': ts}}, tailable=True, await_data=True)
# oplogReplay flag - not exposed in the public API
cursor.add_option(8)
while cursor.alive:
for doc in cursor:
ts = doc['ts']
# Do something...
time.sleep(1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With