Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transaction rollback in a reactive application

I am using RxJava 1.1 to compose an observable sequence from inside a Spring application that looks like the following:

@Transaction
public Observable<Event> create(Event event) {
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}

An event reaches the service layer of my application from some controller class and this propagates through a chain of operations built with RxJava's flatMap() function. The event is first stored in database (Spring Data) and next two asynchronous HTTP requests are executed one after the other using Spring's AsyncRestTemplate library behind the scenes.

In case an error/exception is thrown anywhere in the pipeline, I would like to be able to rollback the database transaction so that the event is NOT stored in database. I found this is not easy to do since in Spring the transaction context is associated with the particular thread of execution. So if the code reaches the doOnError callback on a different thread (AsyncRestTemplate uses its own AsyncTaskExecutor), it is not possible to rollback the initially created transaction.

Can you please advise any mechanism to achieve transactions across a multi-threaded application composed of several asynchronous operations written in this way?

I have also tried to create a transaction programmatically with:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());

and then send the transactionStatus object along with the event across the pipeline, but again when a error occurs and I invoke "platformTransactionManager.rollback(status);", I get "transaction synchronization is not active" since this is running on a different thread I guess.

p.s. The sendEventToServiceA / sendEventToServiceB methods look similar to this:

public Observable<Event> sendEventToServiceA(event) {
    ..........
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
              "/serviceA/create?event_id=" + event.id,
              HttpMethod.POST, requestEntity, String.class);

    return ObservableUtil.toRxObservable(listenableFuture);
}
like image 945
odybour Avatar asked Mar 08 '16 16:03

odybour


1 Answers

One way of doing this is to ensure that the error is observed on the same thread as the db save:

@Transaction
public Observable<Event> create(Event event) {

     Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            .subscribeOn(scheduler)
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .observeOn(scheduler)
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}
like image 169
Dave Moten Avatar answered Sep 24 '22 09:09

Dave Moten