Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling concurrent transactions in spring data

I am using spring Data. I have a problem with the spring data concurrent transactions as follows:

The entity and the repositories are as follows:

    @Entity
    public class Wallet {

        @Version
        private int version;
        @Id
        @GeneratedValue
        @OrderColumn
        private Long Id;
        @OneToOne()
        @OrderColumn
        private User user;
        @OrderColumn
        private Double virtualBalance;
        @Column(name = "created_by")
        @OrderColumn
        private String createdBy;
        @Column(name = "created_date")
        @OrderColumn
        private Date createdDate;
        @Column(name = "updated_by")
        @OrderColumn
        private String updatedBy;
        @Column(name = "updated_date")
        @OrderColumn
        private Date updatedDate;
... Setters and getters ...
}

The repository is as follows

public interface WalletJpaRepository extends JpaRepository<Wallet, Long>{

    @Lock(LockModeType.OPTIMISTIC) // I have also tried PESSIMISTIC, READ, WRITE, PESSIMISTIC_READ, PESSIMISTIC_WRITE, etc.but they don't seem to work
    Wallet findOne(Long id);

}

I am making a method call to two of the methods concurrently as shown below:

@Test
    public void testConcurrentTransactions() {
        System.out.println("Wallet 1 : ->" + getWallet1());
        System.out.println("Wallet 2 : ->" + getWallet2());
    }

And the two methods are as described below

@Transactional(isolation = Isolation.SERIALIZABLE)
private Wallet getWallet1() {
    Wallet wallet1 = walletJpaRepository.findOne(new Long(1)); // suppose the value of wallet1.getVirtualBalance() is 1000
    wallet1.setVirtualBalance(wallet1.getVirtualBalance().doubleValue() + 100); // After evaluating this line it becomes 1100
    System.out.println(Thread.currentThread().getId());
    return wallet1;
}

@Transactional(isolation = Isolation.SERIALIZABLE)
private Wallet getWallet2() {
    Wallet wallet2 = walletJpaRepository.findOne(new Long(1)); // Here again the value of wallet2.getVirtualBalance() fetched is 1000 but I need 1100 to be the value read
    System.out.println(Thread.currentThread().getId());
    return wallet2;
}

The problem is that I am not getting updated values of the same entity in different method calls.

for example if the value of the entity with id 1 has value of 1000 initially after calling the method getWallet1() the value should be updated to 1100 but it doesn't get reflected in the second method i.e. getWallet2() and again I get 1000 in the second method as explained in the comments of the code above.

I have tried with propagation, Isolation, Lock but still I don't get the required results.

Is there a solution to handle such a scenerio, I am unable to find a solution to such a situation, This is a simplified version of a scenerio which I am getting in a huge monetary transaction system, where the hit rate is about 4 to 5 transactions per second.

The above is just an example in which I just tried to reproduce the scenerio, Below is the actual code for the same.

@Override
@Transactional
public InterWalletRequestFrontendWrapper approveOrDeclineRequest(User requestingUser, String operation,
        String requestId) {

    InterWalletRequest walletRequest = interWalletRequestJpaRepository.findOne(Long.parseLong(requestId));
    if (walletRequest.getStatus().equalsIgnoreCase(Utility.statusInitiated)
            || walletRequest.getStatus().equalsIgnoreCase(Utility.statusPending)) {
        if (operation.equalsIgnoreCase(Utility.operationDecline)) {
            walletRequest.setStatus(Utility.statusDeclined);
            interWalletRequestJpaRepository.save(walletRequest);
            InterWalletRequestFrontendWrapper response = fetchRaisedRequests(requestingUser);
            response.setStatus(0);
            response.setStatusDesc(Utility.statusDeclined);
            return response;
        } else {

            User admin = walletRequest.getRequestTo();
            Wallet adminWallet = admin.getWallet();

            if (adminWallet.getVirtualBalance() >= walletRequest.getAmount()) {
                try {

                    User user = walletRequest.getRequestFrom();

                    UserWalletTransaction txn1 = new UserWalletTransaction();
                    UserWalletTransaction txn2 = new UserWalletTransaction();
                    /**
                     * New transaction initiated for admin
                     */
                    txn1.setAmountTransacted(walletRequest.getAmount());
                    txn1.setDebitUser(admin);
                    txn1.setCreditUser(user);
                    txn1.setOperationPerformed(Utility.operationPerformedInterWallet);
                    txn1.setPreviousAmount(admin.getWallet().getVirtualBalance());
                    txn1.setStatus(Utility.statusNew);
                    txn1.setUser(admin);
                    txn1.setTransactionType(Utility.transactionTypeDebit);
                    txn1.setCreatedBy(admin.getUserName());
                    txn1.setUpdatedBy(admin.getUserName());
                    txn1.setCreatedDate(new Date());
                    txn1.setUpdatedDate(new Date());
                    txn1.setWallet(admin.getWallet());

                    /**
                     * New txn initiated for the user who walletRequested
                     * the txn.
                     */
                    txn2.setAmountTransacted(walletRequest.getAmount());
                    txn2.setDebitUser(admin);
                    txn2.setCreditUser(user);
                    txn2.setOperationPerformed(Utility.operationPerformedInterWallet);
                    txn2.setPreviousAmount(user.getWallet().getVirtualBalance());
                    txn2.setStatus(Utility.statusNew);
                    txn2.setTransactionType(Utility.transactionTypeCredit);
                    txn2.setCreatedBy(admin.getUserName());
                    txn2.setUpdatedBy(admin.getUserName());
                    txn2.setCreatedDate(new Date());
                    txn2.setUpdatedDate(new Date());
                    txn2.setUser(user);
                    txn2.setWallet(user.getWallet());

                    txn2 = walletTransactionJpaRepository.save(txn2);

                    Wallet wallet1 = admin.getWallet();
                    wallet1.setVirtualBalance(admin.getWallet().getVirtualBalance() - walletRequest.getAmount());
                    wallet1 = walletJpaRepository.save(wallet1);

                    /**
                     * After debit set the reference of other user.
                     */

                    txn1.setRelationalTransaction(txn2);
                    /**
                     * After debit from admin set balance amount
                     * 
                     */
                    txn1.setBalanceAmount(wallet1.getVirtualBalance());

                    /**
                     * Money deducted from admin wallet but not credited to
                     * the user wallet. so status is pending.
                     */
                    txn1.setStatus(Utility.statusPending);
                    txn1 = walletTransactionJpaRepository.save(txn1);

                    Wallet wallet2 = user.getWallet();
                    wallet2.setVirtualBalance(user.getWallet().getVirtualBalance() + walletRequest.getAmount());
                    wallet2 = walletJpaRepository.save(wallet2);

                    /**
                     * After credit to User wallet add balance amount.
                     */
                    txn2.setBalanceAmount(wallet2.getVirtualBalance());

                    txn1.setStatus(Utility.statusSuccess);
                    txn2.setStatus(Utility.statusSuccess);
                    txn2.setRelationalTransaction(txn1);

                    List<UserWalletTransaction> transactions = new ArrayList<>();
                    transactions.add(txn1);
                    transactions.add(txn2);

                    walletTransactionJpaRepository.save(transactions);

                    walletRequest.setStatus(Utility.statusApproved);
                    interWalletRequestJpaRepository.save(walletRequest);

                    InterWalletRequestFrontendWrapper response = fetchRaisedRequests(requestingUser);
                    response.setStatus(0);
                    response.setBalance(wallet1.getVirtualBalance());
                    response.setStatusDesc(Utility.statusApproved);
                    return response;

                } catch (Exception e) {
                    System.out.println(".......... Exception Caught ..........");
                    walletRequest.setStatus(Utility.statusPending);
                    interWalletRequestJpaRepository.save(walletRequest);
                    InterWalletRequestFrontendWrapper response = fetchRaisedRequests(requestingUser);
                    response.setStatus(0);
                    response.setStatusDesc(Utility.statusDeclined);
                    return response;
                }
            } else {
                /**
                 * if the admin wallet desn't have enough balance then the
                 * status is set to pending.
                 */
                walletRequest.setStatus(Utility.statusPending);
                interWalletRequestJpaRepository.save(walletRequest);
                InterWalletRequestFrontendWrapper response = fetchRaisedRequests(requestingUser);
                response.setStatus(0);
                response.setStatusDesc(Utility.statusDeclined);
                return response;
            }
        }
    } else {
        InterWalletRequestFrontendWrapper response = fetchRaisedRequests(requestingUser);
        response.setStatus(0);
        response.setStatusDesc(Utility.statusDeclined);
        return response;
    }

}

And another method which operates on the same entity is shown below

@Override
@Transactional
private UserWalletTransaction initiateVerifyTransaction(AccountsDetails transfer, User user) {

        Double amountTransacted = 2.00;
        Wallet wallet = user.getWallet();
        UserWalletTransaction transaction = new UserWalletTransaction();
        transaction.setAmountTransacted(amountTransacted);

        transaction.setPreviousAmount(wallet.getVirtualBalance());
        transaction.setOperationPerformed(Utility.operationPerformedDVBeneFundTransfer);
        transaction.setTransactionType(Utility.transactionTypeDebit);

        /**
         * Debit from wallet.
         */
        wallet.setVirtualBalance(wallet.getVirtualBalance() - amountTransacted);
        wallet.setUpdatedDate(new Date());
        wallet.setUpdatedBy(user.getUserName());
        wallet = walletJpaRepository.save(wallet);
        logger.info(wallet);

        transaction.setBalanceAmount(wallet.getVirtualBalance());
        transaction.setUser(user);
        transaction.setWallet(wallet);
        transaction.setStatus(Utility.statusNew);
        transaction.setCreatedBy(user.getUserName());
        transaction.setUpdatedBy(user.getUserName());
        transaction.setCreatedDate(new Date());
         transaction.setToAccount(transfer.getAccount());
         transaction.setBankName(transfer.getBankName());
         transaction.setBeniMobile(transfer.getRecipientMobileNo());
         transaction.setTransactionMode(transfer.getChannel().equalsIgnoreCase("2")
         ? "IMPS" : "NEFT");
        return walletTransactionJpaRepository.save(transaction);

    }

Like this there are seven methods in different services which access the wallet at the same time as there can be number of users logged in at the same time, And probabilities are that the users admin is also logged in and performing monetary transactions, that is the real scenerio where we get this problem.

Thanks in advance

like image 243
Kalyan Pradhan Avatar asked Aug 22 '16 09:08

Kalyan Pradhan


People also ask

How does Java handle concurrent transactions?

Example : thread A from server S1 and thread B from server S2 access the same record R at the same time. First the thread A from server S1 update the value of R to R1 and commit the transaction . At the same time the thread B from server S2 update the value of R to R2 and commit the transaction .

How does spring boot handle concurrent updates?

1. select * from my_entities where id = ?; 2. update my_entities set ..., version = <version value from query #1> + 1 where id = ? and version = <version value from query #1>; So if another concurrent process manages to update this entity first, then your method fails with an exception (OptimisticLockException).


1 Answers

Hello guys I am going to answer my own question this might help someone in future, I have found the solution to my problem. Thanks Denium for pointing out the problem. It's really a great concept.

The mistake I was doing was making internal calls to methods and writing @Transactional on the private methods.

@Transactional is implemented using spring AOPso the internal method calls never actually reach the proxy and the behaviour of the functionality of @Transactional is wierd.

So the solution was to wrap the methods in an object and defining the @Transactional on the methods of the object, and only making the external calls to the object.

Other solutions might be defining our own point cuts and advice

For more reference please visit the following links:

http://docs.spring.io/spring/docs/current/spring-framework-reference/html/aop.html https://www.mkyong.com/spring/spring-aop-example-pointcut-advisor/

Please feel free to add any suggestions and edits,

Thanks

like image 103
Kalyan Pradhan Avatar answered Oct 04 '22 04:10

Kalyan Pradhan