I'm looking for a java concurrency idiom to pair matches with a large number elements with the highest throughput.
Consider I have "people" coming in from multiple threads. Each "person" is looking for a match. When it finds another waiting "person" it matches with they are both assigned to each other and removed for processing.
I don't want to lock a big structure to change states. Consider Person has getMatch and setMatch. Before being submitted each person's #getMatch is null. But when they unblocked (or are fished) they either have expired because they waited to long for a match or #getMatch is non null.
Some problems with keeping high through put is that if PersonA is submitted at the same time as PersonB. They match each other but PersonB also matches already waiting PersonC. PersonB's state changes to "available" when they are submitted. But PersonA needs to not accidentally get PersonB while PersonB is being matched with PersonC. Make sense? Also, I want to do this in a way that works asynchronously. In other words, I don't want each submitter to have to hold on to a Person on a thread with a waitForMatch type thing.
Again, I dont want requests to have to run on separate threads, but it's okay if there is one additional match maker thread.
Seems like there should be some idioms for this as it appears to be a pretty common thing. But my google searches have come up dry (I may be using the wrong terms).
UPDATE
There are a couple of things that make this problem hard for me. One is that I don't want to have objects in memory, I'd like to have all waiting candidates in redis or memcache or something like that. The other is that any person could have several possible matches. Consider an interface like the following:
person.getId(); // lets call this an Integer
person.getFriendIds(); // a collection of other person ids
Then I have a server that looks something like this:
MatchServer:
submit( personId, expiration ) -> void // non-blocking returns immediately
isDone( personId ) -> boolean // either expired or found a match
getMatch( personId ) -> matchId // also non-blocking
This is for a rest interface and it would use redirects until you got to the result. My first thought was to just have a Cache in MatchServer that was backed by something like redis and has a concurrent weak value hash map for objects that were currently locked and being acted on. Each personId would be wrapped by a persistent state object with states like submitted, matched, and expired.
Following so far? Pretty simple, the submit code did the initial work, it was something like this:
public void submit( Person p, long expiration ) {
MatchStatus incoming = new MatchStatus( p.getId(), expiration );
if ( !tryMatch( incoming, p.getFriendIds() ) )
cache.put( p.getId(), incoming );
}
public boolean isDone( Integer personId ) {
MatchStatus status = cache.get( personId );
status.lock();
try {
return status.isMatched() || status.isExpired();
} finally {
status.unlock();
}
}
public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
for ( Integer friend : friends ) {
if ( match( incoming, friend ) )
return true;
}
return false;
}
private boolean match( MatchStatus incoming, Integer waitingId ) {
CallStatus waiting = cache.get( waitingId );
if ( waiting == null )
return false;
waiting.lock();
try {
if ( waiting.isMatched() )
return false;
waiting.setMatch( incoming.getId() );
incoming.setMatch( waiting.getId() );
return true
} finally {
waiting.unlock();
}
}
So the problem here is that if two people come in at the same time and they are their only matches they wont find each other. A race condition right? The only way I could see to solve it was to synchronize "tryMatch()". But that kills my throughput. I can't have tryMatch indefinitely loop because I need these to be very short calls.
So what is a better way to approach this? Every solution I come up with forces people in one at a time which isn't great for throughput. For example, creating a background thread and using a blocking queue for putting and taking incoming one at a time.
Any guidance would be greatly appreciated.
You might be able to use a ConcurrentHashMap
. I'm assuming that your objects have keys they can match on, e.g. PersonA and PersonB would have a "Person" key.
ConcurrentHashMap<String, Match> map = new ConcurrentHashMap<>();
void addMatch(Match match) {
boolean success = false;
while(!success) {
Match oldMatch = map.remove(match.key);
if(oldMatch != null) {
match.setMatch(oldMatch);
success = true;
} else if(map.putIfAbsent(match.key, match) == null) {
success = true;
}
}
}
You'll keep looping until you either add the match to the map, or until you've removed an existing match and paired it. remove
and putIfAbsent
are both atomic.
Edit: Because you want to offload the data to a disk, you can use e.g. MongoDB to this end, with its findAndModify method. If an object with the key already exists, then the command would remove and return it so that you can pair the old object with the new object and presumably store the pair associated with a new key; if an object with the key doesn't exist then the command stores the object with the key. This is equivalent to the behavior of ConcurrentHashMap
except that the data is stored on disk instead of in memory; you don't need to worry about two objects writing at the same time, because the findAndModify
logic prevents them from inadvertently occupying the same key.
Use Jackson if you need to serialize your objects to JSON.
There are alternatives to Mongo, e.g. DynamoDB, although Dynamo is only free for small amounts of data.
Edit: Given that the friends lists are not reflexive, I think you can solve this with a combination of MongoDB (or another key-value database with atomic updates) and a ConcurrentHashMap
.
ConcurrentHashMap<key, boolean>
for it, probably in a global ConcurrentHashMap<key, ConcurrentHashMap<key, boolean>>
.findAndModify
to atomically set it to "matched," then write the new person to MongoDB with a state of "matched," and finally add the pair to a "Pairs" collection in MongoDB that can be queried by the end user. Remove the person's ConcurrentHashMap
from the global map.ConcurrentHashMap
. it has, then do nothing; if it has not, then check to see if the friend has a ConcurrentHashMap
associated with it; if it does, then set the value associated with the current person's key to "true." (Note that it's still possible for two friends to have written to each others' hash maps since the current person can't check its own map and modify the friend's map with one atomic operation, but the self hash map check reduces this possibility.)ConcurrentHashMap
from the global map, and create a delayed task that will iterate through the ids of all of the friends that wrote to the person's ConcurrentHashMap
(i.e. using ConcurrentHashMap#keySet()
). The delay on this task should be random (e.g. Thread.sleep(500 * rand.nextInt(30))
) so that two friends won't always attempt to match at the same time. If the current person doesn't have any friends that it needs to re-check, then don't create a delayed task for it.In the common case, a person either matches with a friend, or else fails to match without a friend having been added to the system while iterating through the list of friends (i.e. the person's ConcurrentHashMap
will be empty). In case simultaneous writes of friends:
Friend1 and Friend2 are added at the same time.
ConcurrentHashMap
to indicate that they missed each other.ConcurrentHashMap
to indicate the same (this would only occur if Friend2 were to check to see that Friend1 wrote to its map at the same time that Friend1 was writing to it - ordinarily Friend2 would detect that Friend1 had written to its map and so it would not write to Friend1's map).A few hiccups:
ConcurrentHashMaps
associated with them, e.g. if Friend2 is still initializing its hash map at the time that Friend1 checks to see if the map is in memory. This is fine, because Friend2 will write to Friend1's hash map and so we're guaranteed that the match will eventually be attempted - at least one of them will have a hash map while the other is iterating, since hash map creation precedes iteration.ConcurrentHashMap
, and then the next iteration should use this as the new friend list. Eventually the person will be matched, or else the person's "re-check" friends list will be emptied.Thread.sleep(500 * rand.nextInt(30))
on the first iteration, Thread.sleep(500 * rand.nextInt(60))
on the second iteration, Thread.sleep(500 * rand.nextInt(90))
on the third, etc).ConcurrentHashMap
before removing the person from MongoDB, otherwise you'll have a data race. Likewise, you must remove a person from MongoDB while you're iterating through its potential matches, otherwise you might inadvertently match it twice.Edit: Some code:
The method addUnmatchedToMongo(person1)
writes an "unmatched" person1 to MongoDB
setToMatched(friend1)
uses findAndModify
to atomically set friend1
to "matched"; the method will return false if friend1
is already matched or doesn't exist, or will return true if the update was successful
isMatched(friend1)
returns true if friend1
exists and is matched, and returns false if it doesn't exist or exists and is "unmatched"
private ConcurrentHashMap<String, ConcurrentHashMap<String, Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;
executor.execute(new Runnable() {
public void run() {
while(true) {
Runnable runnable = delayQueue.take();
executor.execute(runnable);
}
}
}
public static void findMatch(Person person, Collection<Person> friends) {
findMatch(person, friends, 1);
}
public static void findMatch(Person person, Collection<Person> friends, int delayMultiplier) {
globalMap.put(person.id, new ConcurrentHashMap<String, Person>());
for(Person friend : friends) {
if(**setToMatched(friend)**) {
// write person to MongoDB in "matched" state
// write "Pair(person, friend)" to MongoDB so it can be queried by the end user
globalMap.remove(person.id);
return;
} else {
if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
// the existence of "friendMap" indicates another thread is currently trying to match the friend
ConcurrentHashMap<String, Person> friendMap = globalMap.get(friend.id);
if(friendMap != null) {
friendMap.put(person.id, person);
}
}
}
}
**addUnmatchedToMongo(person)**;
Collection<Person> retryFriends = globalMap.remove(person.id).values();
if(retryFriends.size() > 0) {
delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier), person, retryFriends, delayMultiplier));
}
}
public class DelayedRetry implements Runnable, Delayed {
private final long delay;
private final Person person;
private final Collection<Person> friends;
private final int delayMultiplier;
public DelayedRetry(long delay, Person person, Collection<Person> friends, delayMultiplier) {
this.delay = delay;
this.person = person;
this.friends = friends;
this.delayMultiplier = delayMultiplier;
}
public long getDelay(TimeUnit unit) {
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
public void run {
findMatch(person, friends, delayMultiplier + 1);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With