When running the new MongDB Server, version 3.6, and trying to add a Change Stream watch to a collection to get notifications of new inserts and updates of documents, I only receive notifications for updates, not for inserts.
This is the default way I have tried to add the watch:
IMongoDatabase mongoDatabase = mongoClient.GetDatabase("Sandbox");
IMongoCollection<BsonDocument> collection = mongoDatabase.GetCollection<BsonDocument>("TestCollection");
var changeStream = collection.Watch().ToEnumerable().GetEnumerator();
changeStream.MoveNext();
var next = changeStream.Current;
Then I downloaded the C# source code from MongoDB to see how they did this. Looking at their test code for change stream watches, they create a new document(Insert) and then change that document right away(Update) and THEN set up the Change Stream watch to receive an 'update' notification. No example is given on how to watch for 'insert' notifications.
I have looked at the Java and NodeJS examples, both on MongoDB website and SO, which seems to be straight forward and defines a way to see both Inserts and Updates:
var changeStream = collection.watch({ '$match': { $or: [ { 'operationType': 'insert' }, { 'operationType': 'update' } ] } });
The API for the C# driver is vastly different, I would have assumed they would have kept the same API for C# as Java and NodeJS. I found no or very few examples for C# to do the same thing.
The closest I have come is with the following attempt but still fails and the documentation for the C# version is very limited (or I have not found the right location). Setup is as follows:
String json = "{ '$match': { 'operationType': { '$in': ['insert', 'update'] } } }";
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
PipelineDefinition<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>> pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(Builders<ChangeStreamDocument<BsonDocument>>.Filter.Text(json,"json"));
Then running the statement below throws an Exception:
{"Command aggregate failed: $match with $text is only allowed as the first pipeline stage."}
No other Filter options has worked either, and I have not found a way to just enter the JSON as a string to set the 'operationType'.
var changeStream = collection.Watch(pipeline, options).ToEnumerable().GetEnumerator();
changeStream.MoveNext();
var next = changeStream.Current;
My only goal here is to be able to set the 'operationType' using the C# driver. Does anyone know what I am doing wrong or have tried this using the C# driver and had success?
After reading though a large number of webpages, with very little info on the C# version of the MongoDB driver, I am very stuck! Any help would be much appreciated.
Change streams transform a MongoDB database into a real-time database by taking advantage of MongoDB's replication process. They monitor replication in MongoDB, providing an API for external applications that require real-time data without the risk involved in tailing the oplog or the overhead that comes with polling.
You can watch for changes in MongoDB using the watch() method on the following objects: Collection. Database. MongoClient.
MongoDB guarantees the order of changes are preserved and change stream notifications can be safely interpreted in the order received.
Definition. $changeStream. Returns a Change Stream cursor on a collection, a database, or an entire cluster. Must be used as the first stage in an aggregation pipeline.
Here is a sample of code I've used to update the collection Watch to retrieve "events" other than just document updates.
IMongoDatabase sandboxDB = mongoClient.GetDatabase("Sandbox");
IMongoCollection<BsonDocument> collection = sandboxDB.GetCollection<BsonDocument>("TestCollection");
//Get the whole document instead of just the changed portion
ChangeStreamOptions options = new ChangeStreamOptions() { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
//The operationType can be one of the following: insert, update, replace, delete, invalidate
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }");
var changeStream = collection.Watch(pipeline, options).ToEnumerable().GetEnumerator();
changeStream.MoveNext(); //Blocks until a document is replaced, inserted or updated in the TestCollection
ChangeStreamDocument<BsonDocument> next = changeStream.Current;
enumerator.Dispose();
The EmptyPiplineDefinition...Match() argument could also be:
"{ $or: [ {operationType: 'replace' }, { operationType: 'insert' }, { operationType: 'update' } ] }"
If you wanted to use the $or command, or
"{ operationType: /^[^d]/ }"
to throw a little regex in there. This last one is saying, I want all operationTypes unless they start with the letter 'd'.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With