I'm using pthreads to try and parallelize Dijkstra's pathfinding algorithm, but I'm running into a deadlock scenario I can't seem to figure out. The gist of it is that every thread has its own priority queue where it gets work (a std::multiset) and a mutex lock corresponding to that queue that is locked whenever it needs to be modified.
Every node has an owner thread which corresponds to the node ID modulo thread count. If a thread is looking through a node's neighbors and updates one of their weights (label) to something lower than it was before, it locks its owner's queue and removes/reinserts (this is to force the set to update its position in the queue). However, this implementation seems to deadlock. I can't tell why though because as far as I can tell, each thread holds only one lock at a time.
Each thread's initial queue contains all of its nodes, but every node's weight besides the source is initialized to ULONG_MAX. If a thread is out of work (it's getting nodes with ULONG_MAX weight from the queue) it just keeps locking and unlocking until another thread gives it work.
void *Dijkstra_local_owner_worker(void *param){
struct thread_args *myargs = ((struct thread_args *)param);
int tid = myargs->tid;
std::multiset<Node *,cmp_label> *Q = (myargs->Q);
struct thread_args *allargs = ((struct thread_args *)param)-tid;
AdjGraph *G = (AdjGraph *)allargs[thread_count].Q;
struct Node *n, *p;
int owner;
std::set<Edge>::iterator it;
Edge e;
pthread_mutex_lock(&myargs->mutex);
while(!Q->empty()){
n = *Q->begin(); Q->erase(Q->begin());
pthread_mutex_unlock(&myargs->mutex);
if(n->label == ULONG_MAX){
pthread_mutex_lock(&myargs->mutex);
Q->insert(n);
continue;
}
for( it = n->edges->begin(); it != n->edges->end(); it++){
e = *it;
p = G->getNode(e.dst);
owner = (int)(p->index % thread_count);
if(p->label > n->label + e.weight){
pthread_mutex_lock(&(allargs[owner].mutex));
allargs[owner].Q->erase(p);
p->label = n->label + e.weight;
p->prev = n;
allargs[owner].Q->insert(p);//update p's position in the PQ
pthread_mutex_unlock(&(allargs[owner].mutex));
}
}
pthread_mutex_lock(&myargs->mutex);
}
pthread_mutex_unlock(&myargs->mutex);
return NULL;
}
Here's the function that spawns the threads.
bool Dijkstra_local_owner(AdjGraph *G, struct Node *src){
G->setAllNodeLabels(ULONG_MAX);
struct thread_args args[thread_count+1];
src->label = 0;
struct Node *n;
for(int i=0; i<thread_count; i++){
args[i].Q = new std::multiset<Node *,cmp_label>;
args[i].tid = i;
pthread_mutex_init(&args[i].mutex,NULL);
}
for(unsigned long i = 0; i < G->n; i++){
n = G->getNode(i); //give all threads their workload in advance
args[(n->index)%thread_count].Q->insert(n);
}
args[thread_count].Q = (std::multiset<Node *,cmp_label> *)G;
//hacky repackaging of a pointer to prevent use of globals
//please note this works and is not the issue. I know it's horrible.
pthread_t threads[thread_count];
for(int i=0; i< thread_count; i++){
pthread_create(&threads[i],NULL,Dijkstra_local_owner_worker,&args[i]);
}
for(int i=0; i< thread_count; i++){
pthread_join(threads[i],NULL);
}
for(int i=0; i< thread_count; i++){
delete args[i].Q;
}
}
The structure definition for each thread's arguments:
struct thread_args{
std::multiset<Node *,cmp_label> *Q;
pthread_mutex_t mutex;
int tid;
};
My question is, where does this code deadlock? I'm getting tunnel vision here so I can't see where I'm going wrong. I've ensured all other logic works, so things like pointer dereferences, etc. are correct.
If a thread is out of work (it's getting nodes with ULONG_MAX weight from the queue) it just keeps locking and unlocking until another thread gives it work.
This is a potential problem - once a thread gets into this state, it will essentially hold the mutex locked for the entire duration of its timeslice. pthreads mutexes are lightweight, which means they aren't guaranteed to be fair - it's quite possible (likely, even) that the busy-waiting thread will be able to re-acquire the lock before a woken waiting thread is able to acquire it.
You should use pthread_cond_wait()
here, and have the condition variable signalled when another thread updates the queue. The start of your loop would then look something like:
pthread_mutex_lock(&myargs->mutex);
while (!Q->empty())
{
n = *Q->begin();
if (n->label == ULONG_MAX)
{
pthread_cond_wait(&myargs->cond, &myargs->mutex);
continue; /* Re-check the condition after `pthread_cond_wait()` returns */
}
Q->erase(Q->begin());
pthread_mutex_unlock(&myargs->mutex);
/* ... */
and the point where you update another node's queue would look like:
/* ... */
allargs[owner].Q->insert(p); //update p's position in the PQ
pthread_cond_signal(&allargs[owner].cond);
pthread_mutex_unlock(&allargs[owner].mutex);
your code looks something like:
lock()
While(cond)
{
unlock()
if (cond1)
{
lock()
}
for(...)
{
....
}
lock()
}
unlock()
I think it's easy to see you can have problems with this approach depending on the datapath.
I would use the lock only for critical operations:
lock()
Q->erase(..)
unlock()
OR
lock()
Q->insert(..)
unlock()
Try to simplify things and see if that helps
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With