Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java concurrent locks on the Map key level

There are writer which updates prices by calling putPrice method. Reader is using getPrice to get a latest price. hasChangedMethod returns a boolean identifying if price has been changed since last time getPrice has been invoked.

I am looking for fastest solution. I am trying to achieve thread-safe consistent read/writes into the map on key level.

I think that locking the whole map may cause a performance issue thats why I decided to make it on the key level. Unfortunately it doesn't work as expected and blocks the whole map. Why? Could you please help me to figure out what I am doing wrong here?

UPDATE:

I guess we can summarise in two questions: 1. how do I provide free access to rest of the keys if one is in the update process. 2. How do I guarantee atomic operations of my methods since they require multiple operations read/write. eg getPrice() - get price and update hasChanged flag.

PriceHolder.java

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setHasChangedSinceLastRead(true);
        EUR.setPrice(new BigDecimal(0));

        Price USD = new Price();
        USD.setHasChangedSinceLastRead(true);
        USD.setPrice(new BigDecimal(0));
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);

    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(
            String e,
            BigDecimal p) throws InterruptedException {

            synchronized (prices.get(e)) {
                Price currentPrice = prices.get(e);
                if (currentPrice != null && !currentPrice.getPrice().equals(p)) {
                    currentPrice.setHasChangedSinceLastRead(true);
                    currentPrice.setPrice(p);
                } else {
                    Price newPrice = new Price();
                    newPrice.setHasChangedSinceLastRead(true);
                    newPrice.setPrice(p);
                    prices.put(e, newPrice);
                }
            }
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price currentPrice = prices.get(e);
        if(currentPrice != null){
            synchronized (prices.get(e)){
                currentPrice.setHasChangedSinceLastRead(false);
                prices.put(e, currentPrice);
            }
            return currentPrice.getPrice();
        }
        return null;
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        synchronized (prices.get(e)){
            return prices.get(e) != null ? prices.get(e).isHasChangedSinceLastRead() : false;
        }
    }
}

Price.java

public class Price {

    private BigDecimal price;

    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }

    public void setHasChangedSinceLastRead(boolean hasChangedSinceLastRead) {
        this.hasChangedSinceLastRead = hasChangedSinceLastRead;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    private boolean hasChangedSinceLastRead = false;

}
like image 890
Wild Goat Avatar asked Nov 03 '15 14:11

Wild Goat


2 Answers

The use of a ConcurrentMap heavily depends on the Java version. When you are using Java 8 or newer, you get almost everything for free:

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setHasChangedSinceLastRead(true);
        EUR.setPrice(BigDecimal.ZERO);

        Price USD = new Price();
        USD.setHasChangedSinceLastRead(true);
        USD.setPrice(BigDecimal.ZERO);
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);

    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(String e, BigDecimal p) {
        prices.compute(e, (k, price)-> {
            if(price==null) price=new Price();
            price.setHasChangedSinceLastRead(true);
            price.setPrice(p);
            return price;
        });
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price price = prices.computeIfPresent(e, (key, value) -> {
            value.setHasChangedSinceLastRead(false);
            return value;
        });
        return price==null? null: price.getPrice();
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        final Price price = prices.get(e);
        return price!=null && price.isHasChangedSinceLastRead();
    }
}

The compute… methods on a concurrent map lock the affected entry for the duration of the computation while letting updates of all other entries proceed. For simple get access like in hasPriceChanged, no additional synchronization is necessary as long as you call it only once in a method, i.e. keep the result in a local variable while examining.


Before Java 8, things are more complicated. There, all ConcurrentMap offers, are certain atomic update methods which can be used to build more high-level update methods in a try-and-repeat fashion.

To use it cleanly, the best way is to make the value class immutable:

public final class Price {

    private final BigDecimal price;
    private final boolean hasChangedSinceLastRead;

    Price(BigDecimal value, boolean changed) {
      price=value;
      hasChangedSinceLastRead=changed;
    }
    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }
    public BigDecimal getPrice() {
        return price;
    }
}

Then use it to always construct a new object reflecting the desired new state and perform atomic updates using either putIfAbsent or replace:

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price(BigDecimal.ZERO, true);
        Price USD = EUR; // we can re-use immutable objects...
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);
    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(String e, BigDecimal p) {
      Price old, _new=new Price(p, true);
      do old=prices.get(e);
      while(old==null? prices.putIfAbsent(e,_new)!=null: !prices.replace(e,old,_new));
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        for(;;) {
          Price price = prices.get(e);
          if(price==null) return null;
          if(!price.isHasChangedSinceLastRead()
          || prices.replace(e, price, new Price(price.getPrice(), false)))
            return price.getPrice();
        }
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        final Price price = prices.get(e);
        return price!=null && price.isHasChangedSinceLastRead();
    }
}
like image 141
Holger Avatar answered Nov 12 '22 14:11

Holger


How about something like

class AtomicPriceHolder {

  private volatile BigDecimal value;
  private volatile boolean dirtyFlag;

  public AtomicPriceHolder( BigDecimal initialValue) {
    this.value = initialValue;
    this.dirtyFlag = true;
  }

  public synchronized void updatePrice( BigDecimal newPrice ) {
    if ( this.value.equals( newPrice ) == false) {
      this.value = newPrice;
      this.dirtyFlag = true;
    }
  }

  public boolean isDirty() {
    return this.dirtyFlag;
  }

  public BigDecimal peek() {
    return this.value;
  }

  public synchronized BigDecimal read() {
    this.dirtyFlag = false;
    return this.value;
  }

}

...

public void updatePrice( String id, BigDecimal value ) {

  AtomicPriceHolder holder;
  synchronized( someGlobalSyncObject ) {
    holder = prices.get(id);
    if ( holder == null ) {
      prices.put( id, new AtomicPriceHolder( value ) );
      return;
    }
  }

  holder.updatePrice( value );

}

Note though that it probably does not make any sense this way, because the actual atomic modification of the price's value is so fast that you cannot expect to gain anything from unlocking the map before.

The conditional operations "check if it's in the map, create a new one and insert if not" must be atomic, and should be done by locking the whole map for that brief period. Anything else would require a dedicated synchronization object for each key. These would have to be stored and managed somewhere, and access to that store would have to be synchronized again &c.

Just do the coarse-grained locking to ensure you have correctness and then move on.

like image 32
JimmyB Avatar answered Nov 12 '22 15:11

JimmyB