Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to control flow of a transactional stream with Spring Data R2DBC?

Support for transactional streams seems to have been recently implemented but due to its newness, there are not many code examples.

Could someone show an example of a transactional stream that does a series of database inserts and then returns some value on success, but with a midstream checkpoint in between inserts that tests some condition and might roll back the transaction and return different values depending on the checkpoint result?

like image 299
Ray Zhang Avatar asked Nov 02 '25 17:11

Ray Zhang


1 Answers

Reactive transactions follow the same pattern as imperative ones:

  1. A transaction is started before running any user-space commands
  2. Run user-space commands
  3. Commit (or rollback)

A few aspects to note here: A connection is always associated with a materialization of a reactive sequence. What we know from a Thread-bound connection that is bound to an execution in imperative programming translates to an materialization in reactive programming.

So each (concurrent) execution gets a connection assigned.

Spring Data R2DBC has no support for savepoints. Take a look at the following code example that illustrates a decision to either commit or rollback:

DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);

TransactionalOperator transactionalOperator = TransactionalOperator
        .create(new R2dbcTransactionManager(connectionFactory));

transactionalOperator.execute(tx -> {

    Mono<Void> insert = databaseClient.execute("INSERT INTO legoset VALUES(…)")
            .then();

    Mono<Long> select = databaseClient.execute("SELECT COUNT(*) FROM legoset")
            .as(Long.class)
            .fetch()
            .first();

    return insert.then(select.handle((count, sink) -> {

        if(count > 10) {
            tx.setRollbackOnly();
        }

    }));
}).as(StepVerifier::create).verifyComplete();

Notable aspects here are:

  1. We're using TransactionalOperator instead of @Transactional.
  2. The code in .handle() calls setRollbackOnly() to roll back the transaction.

Using @Transactional, you would typically use exceptions to signal a rollback condition.

like image 192
mp911de Avatar answered Nov 05 '25 12:11

mp911de



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!