Support for transactional streams seems to have been recently implemented but due to its newness, there are not many code examples.
Could someone show an example of a transactional stream that does a series of database inserts and then returns some value on success, but with a midstream checkpoint in between inserts that tests some condition and might roll back the transaction and return different values depending on the checkpoint result?
Reactive transactions follow the same pattern as imperative ones:
A few aspects to note here: A connection is always associated with a materialization of a reactive sequence. What we know from a Thread-bound connection that is bound to an execution in imperative programming translates to an materialization in reactive programming.
So each (concurrent) execution gets a connection assigned.
Spring Data R2DBC has no support for savepoints. Take a look at the following code example that illustrates a decision to either commit or rollback:
DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
TransactionalOperator transactionalOperator = TransactionalOperator
.create(new R2dbcTransactionManager(connectionFactory));
transactionalOperator.execute(tx -> {
Mono<Void> insert = databaseClient.execute("INSERT INTO legoset VALUES(…)")
.then();
Mono<Long> select = databaseClient.execute("SELECT COUNT(*) FROM legoset")
.as(Long.class)
.fetch()
.first();
return insert.then(select.handle((count, sink) -> {
if(count > 10) {
tx.setRollbackOnly();
}
}));
}).as(StepVerifier::create).verifyComplete();
Notable aspects here are:
TransactionalOperator instead of @Transactional..handle() calls setRollbackOnly() to roll back the transaction.Using @Transactional, you would typically use exceptions to signal a rollback condition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With