Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrently Pair Matches

I'm looking for a java concurrency idiom to pair matches with a large number elements with the highest throughput.

Consider I have "people" coming in from multiple threads. Each "person" is looking for a match. When it finds another waiting "person" it matches with they are both assigned to each other and removed for processing.

I don't want to lock a big structure to change states. Consider Person has getMatch and setMatch. Before being submitted each person's #getMatch is null. But when they unblocked (or are fished) they either have expired because they waited to long for a match or #getMatch is non null.

Some problems with keeping high through put is that if PersonA is submitted at the same time as PersonB. They match each other but PersonB also matches already waiting PersonC. PersonB's state changes to "available" when they are submitted. But PersonA needs to not accidentally get PersonB while PersonB is being matched with PersonC. Make sense? Also, I want to do this in a way that works asynchronously. In other words, I don't want each submitter to have to hold on to a Person on a thread with a waitForMatch type thing.

Again, I dont want requests to have to run on separate threads, but it's okay if there is one additional match maker thread.

Seems like there should be some idioms for this as it appears to be a pretty common thing. But my google searches have come up dry (I may be using the wrong terms).

UPDATE

There are a couple of things that make this problem hard for me. One is that I don't want to have objects in memory, I'd like to have all waiting candidates in redis or memcache or something like that. The other is that any person could have several possible matches. Consider an interface like the following:

person.getId();         // lets call this an Integer
person.getFriendIds();  // a collection of other person ids

Then I have a server that looks something like this:

MatchServer:
   submit( personId, expiration ) -> void // non-blocking returns immediately
   isDone( personId ) -> boolean          // either expired or found a match
   getMatch( personId ) -> matchId        // also non-blocking

This is for a rest interface and it would use redirects until you got to the result. My first thought was to just have a Cache in MatchServer that was backed by something like redis and has a concurrent weak value hash map for objects that were currently locked and being acted on. Each personId would be wrapped by a persistent state object with states like submitted, matched, and expired.

Following so far? Pretty simple, the submit code did the initial work, it was something like this:

public void submit( Person p, long expiration ) {
    MatchStatus incoming = new MatchStatus( p.getId(), expiration );
    if ( !tryMatch( incoming, p.getFriendIds() ) )
        cache.put( p.getId(), incoming ); 
}

public boolean isDone( Integer personId ) {
    MatchStatus status = cache.get( personId );
    status.lock();
    try {
        return status.isMatched() || status.isExpired();

    } finally {
        status.unlock();
    }
}

public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
    for ( Integer friend : friends ) {
        if ( match( incoming, friend ) )
            return true;
    }

    return false;
}

private boolean match( MatchStatus incoming, Integer waitingId ) {
    CallStatus waiting = cache.get( waitingId );
    if ( waiting == null )
        return false;

    waiting.lock();
    try {
        if ( waiting.isMatched() )
            return false;

        waiting.setMatch( incoming.getId() );
        incoming.setMatch( waiting.getId() );

        return true
    } finally {
        waiting.unlock();
    }
}

So the problem here is that if two people come in at the same time and they are their only matches they wont find each other. A race condition right? The only way I could see to solve it was to synchronize "tryMatch()". But that kills my throughput. I can't have tryMatch indefinitely loop because I need these to be very short calls.

So what is a better way to approach this? Every solution I come up with forces people in one at a time which isn't great for throughput. For example, creating a background thread and using a blocking queue for putting and taking incoming one at a time.

Any guidance would be greatly appreciated.

like image 831
robert_difalco Avatar asked Oct 21 '22 10:10

robert_difalco


1 Answers

You might be able to use a ConcurrentHashMap. I'm assuming that your objects have keys they can match on, e.g. PersonA and PersonB would have a "Person" key.

ConcurrentHashMap<String, Match> map = new ConcurrentHashMap<>();

void addMatch(Match match) {
    boolean success = false;
    while(!success) {
        Match oldMatch = map.remove(match.key);
        if(oldMatch != null) {
            match.setMatch(oldMatch);
            success = true;
       } else if(map.putIfAbsent(match.key, match) == null) {
            success = true;
       }
   }
}

You'll keep looping until you either add the match to the map, or until you've removed an existing match and paired it. remove and putIfAbsent are both atomic.

Edit: Because you want to offload the data to a disk, you can use e.g. MongoDB to this end, with its findAndModify method. If an object with the key already exists, then the command would remove and return it so that you can pair the old object with the new object and presumably store the pair associated with a new key; if an object with the key doesn't exist then the command stores the object with the key. This is equivalent to the behavior of ConcurrentHashMap except that the data is stored on disk instead of in memory; you don't need to worry about two objects writing at the same time, because the findAndModify logic prevents them from inadvertently occupying the same key.

Use Jackson if you need to serialize your objects to JSON.

There are alternatives to Mongo, e.g. DynamoDB, although Dynamo is only free for small amounts of data.

Edit: Given that the friends lists are not reflexive, I think you can solve this with a combination of MongoDB (or another key-value database with atomic updates) and a ConcurrentHashMap.

  1. Persons in MongoDB are either "matched" or "unmatched." (If I say "remove a person from MongoDB", I mean "set the person's state to 'matched.'")
  2. When you add a new person, first create a ConcurrentHashMap<key, boolean> for it, probably in a global ConcurrentHashMap<key, ConcurrentHashMap<key, boolean>>.
  3. Iterate through the new person's friends:
  4. If a friend is in MongoDB, then use findAndModify to atomically set it to "matched," then write the new person to MongoDB with a state of "matched," and finally add the pair to a "Pairs" collection in MongoDB that can be queried by the end user. Remove the person's ConcurrentHashMap from the global map.
  5. If the friend isn't in MongoDB, then check to see if that friend has written to the current friend's associated ConcurrentHashMap. it has, then do nothing; if it has not, then check to see if the friend has a ConcurrentHashMap associated with it; if it does, then set the value associated with the current person's key to "true." (Note that it's still possible for two friends to have written to each others' hash maps since the current person can't check its own map and modify the friend's map with one atomic operation, but the self hash map check reduces this possibility.)
  6. If the person hasn't been matched, then write it to MongoDB in the "unmatched" state, remove its ConcurrentHashMap from the global map, and create a delayed task that will iterate through the ids of all of the friends that wrote to the person's ConcurrentHashMap (i.e. using ConcurrentHashMap#keySet()). The delay on this task should be random (e.g. Thread.sleep(500 * rand.nextInt(30))) so that two friends won't always attempt to match at the same time. If the current person doesn't have any friends that it needs to re-check, then don't create a delayed task for it.
  7. When the delay is up, create a new ConcurrentHashMap for the person, remove the unmatched person from MongoDB, and loop back to Step 1. If the person is already matched, then don't remove it from MongoDB and terminate the delayed task.

In the common case, a person either matches with a friend, or else fails to match without a friend having been added to the system while iterating through the list of friends (i.e. the person's ConcurrentHashMap will be empty). In case simultaneous writes of friends:

Friend1 and Friend2 are added at the same time.

  1. Friend1 writes to Friend2's ConcurrentHashMap to indicate that they missed each other.
  2. Friend2 writes to Friend1's ConcurrentHashMap to indicate the same (this would only occur if Friend2 were to check to see that Friend1 wrote to its map at the same time that Friend1 was writing to it - ordinarily Friend2 would detect that Friend1 had written to its map and so it would not write to Friend1's map).
  3. Friend1 and Friend2 both write to MongoDB. Friend1 randomly gets a 5 second delay on its followup task, Friend2 randomly gets a 15 second delay.
  4. Friend1's task fires first, and matches with Friend2.
  5. Friend2's task fires second; Friend2 is no longer in MongoDB, so the task immediately terminates.

A few hiccups:

  1. It's possible that Friend1 and Friend2 don't both have ConcurrentHashMaps associated with them, e.g. if Friend2 is still initializing its hash map at the time that Friend1 checks to see if the map is in memory. This is fine, because Friend2 will write to Friend1's hash map and so we're guaranteed that the match will eventually be attempted - at least one of them will have a hash map while the other is iterating, since hash map creation precedes iteration.
  2. The second iteration of a match may fail if both friends' tasks somehow fired at the same time. In this case, a person should remove friends from its list if they are in MongoDB in the matched state; they should then take the union of the resulting list with the list of friends who wrote to its ConcurrentHashMap, and then the next iteration should use this as the new friend list. Eventually the person will be matched, or else the person's "re-check" friends list will be emptied.
  3. You should increase the task delay on each subsequent iteration in order to increase the probability that two friends' tasks won't run simultaneously (e.g. Thread.sleep(500 * rand.nextInt(30)) on the first iteration, Thread.sleep(500 * rand.nextInt(60)) on the second iteration, Thread.sleep(500 * rand.nextInt(90)) on the third, etc).
  4. On subsequent iterations, you must create a person's new ConcurrentHashMap before removing the person from MongoDB, otherwise you'll have a data race. Likewise, you must remove a person from MongoDB while you're iterating through its potential matches, otherwise you might inadvertently match it twice.

Edit: Some code:

The method addUnmatchedToMongo(person1) writes an "unmatched" person1 to MongoDB

setToMatched(friend1) uses findAndModify to atomically set friend1 to "matched"; the method will return false if friend1 is already matched or doesn't exist, or will return true if the update was successful

isMatched(friend1) returns true if friend1 exists and is matched, and returns false if it doesn't exist or exists and is "unmatched"

private ConcurrentHashMap<String, ConcurrentHashMap<String, Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;

executor.execute(new Runnable() {
    public void run() {
        while(true) {
            Runnable runnable = delayQueue.take();
            executor.execute(runnable);
        }
    }
}

public static void findMatch(Person person, Collection<Person> friends) {
    findMatch(person, friends, 1);
}

public static void findMatch(Person person, Collection<Person> friends, int delayMultiplier) {
    globalMap.put(person.id, new ConcurrentHashMap<String, Person>());
    for(Person friend : friends) {
        if(**setToMatched(friend)**) {
            // write person to MongoDB in "matched" state
            // write "Pair(person, friend)" to MongoDB so it can be queried by the end user
            globalMap.remove(person.id);
            return;
        } else {
            if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
                // the existence of "friendMap" indicates another thread is currently  trying to match the friend
                ConcurrentHashMap<String, Person> friendMap = globalMap.get(friend.id);
                if(friendMap != null) {
                    friendMap.put(person.id, person);
                }
            }
        }
    }
    **addUnmatchedToMongo(person)**;
    Collection<Person> retryFriends = globalMap.remove(person.id).values();
    if(retryFriends.size() > 0) {
        delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier), person, retryFriends, delayMultiplier));
    }
}

public class DelayedRetry implements Runnable, Delayed {
    private final long delay;
    private final Person person;
    private final Collection<Person> friends;
    private final int delayMultiplier;

    public DelayedRetry(long delay, Person person, Collection<Person> friends, delayMultiplier) {
        this.delay = delay;
        this.person = person;
        this.friends = friends;
        this.delayMultiplier = delayMultiplier;
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert(delay, TimeUnit.MILLISECONDS);
    }

    public void run {
        findMatch(person, friends, delayMultiplier + 1);
    }
}
like image 119
18 revs Avatar answered Oct 27 '22 07:10

18 revs