I am writing a test where I need to upsert some documents(let's say 10) in my couchbase bucket before actually running any tests. So I have a method annotated with @BeforeAll that tries to upset these documents. Now when I try to run the test, the test fails because documents were not persisted by then. In order to wait for these documents to be inserted I am doing something like this -
Flux.fromIterable(couchDocs)
.map(couchDoc -> bucket.upsert(couchDoc, persistTo)
.collectList()
.block();
But still when I run the test I can see that the documents were not persisted by then and the assertions fail. Am I missing something here?
Use .flatMap instead of .map. You inner stream is not getting subscribed to.
Finally solved this issue regarding couchbase and its state not totally updated after block
More notes on this issue. It was actually not an error but Couchbase uses a scanning consistency for its internal indexes of UNBOUNDED. This means that if any creates, deletes, etc are performed on couchbase then its internal state takes a tiny amount of time to reindex. This was causing the intermittent behavour when running the reactor service. Especially when doing a bulk create followed by a get items. Using the query option command QueryOptions.queryOptions().scanConsistency(QueryScanConsistency.REQUEST_PLUS) forced couchbase to update its indexes prior to running the query. This is exactly the behaviour we need and putting this in fixed all this shit ive been having.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With