I have an application that can support a certain number of concurrent actions. This is represented by a table of "slots" in postgres. When nodes come online, they insert a number of rows into the table, one per slot. As jobs claim the slots, they update a row in the table claiming one of the slots and release it again as they finish.
The slots table looks like this:
CREATE TABLE slots (
id INT8 PRIMARY KEY DEFAULT nextval('slots_seq'),
node_name TEXT NOT NULL,
job_name TEXT
);
At any time it has some semi-fixed number of rows each of which may or may not have a job_name filled in.
When a new job wants to start up, it runs these queries to get the name of the node it should run on:
BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
SELECT id, node_name
FROM slots
WHERE job_name IS NULL
LIMIT 1
FOR UPDATE;
(the node_name and id are read out of the cursor)
UPDATE slots
SET job_name = %(job_name)s
WHERE id = %(slot_id)s;
COMMIT;
This is often able to claim rows without losing any updates but with higher levels of concurrency, only a few rows will be claimed while many SELECT ... FOR UPDATE and UPDATE queries have been executed. The net result is that we end up with far more jobs running than there are slots for them.
Am I making a locking error? Is there a better way to go about this? Something that doesn't use table locks?
Transaction level SERIALIZABLE does not cut it, only a handful of rows are ever filled.
I'm using postgresql version 8.4.
BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
UPDATE slots SET job_name = '111' WHERE id IN (SELECT id FROM slots WHERE job_name IS NULL LIMIT 1) RETURNING *;
COMMIT;
This seems to work in Read Committed. It is only sql (same as your code) and can be executed in one call (faster).
@Seth Robertson: It is not safe without LOCK TABLE and without while loop.
If there is transaction A and transaction B at same time: A will select first row and B will select first row. A will lock and update row, B have to wait until A commit. Then B will recheck condition job_name IS NULL. It is false and B will not update - B will not select next row but will only recheck and return empty result.
@joegester: SELECT FOR UPDATE is not the problem because all table is locked.
Maybe there is another way to do job - if you delete and insert rows (in other table?) instead setting NULL. But I am not sure how.
Well, I wrote a program in perl to simulate what was going on since I didn't think that what you were saying was possible. Indeed after running my simulation I didn't have any problems even when I turned locking off (since SELECT … FOR UPDATE and UPDATE should do the necessary locking).
I ran this on PG 8.3 and PG 9.0 and it worked fine on both locations.
I urge you to try the program and/or try a python version to have a nice tight test-case which you can share with the class. If it does work, you can investigate what the differences are and if it doesn't work, you have something that other people can play with.
#!/usr/bin/perl
use DBI;
$numchild = 0;
$SIG{CHLD} = sub { if (wait) {$numchild--;} };
sub worker($)
{
my ($i) = @_;
my ($job);
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my ($x) = 0;
while(++$x)
{
# $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
my @id = $dbh->selectrow_array("select id from slots where job_name is NULL LIMIT 1 FOR UPDATE;");
if ($#id < 0)
{
$dbh->rollback;
sleep(.5);
next;
}
$job = "$$-$i-($x)";
$dbh->do("update slots set job_name='$job' where id=$id[0];") || die "Cannot update at $i\n";
$dbh->commit || die "Cannot commit\n";
last;
}
if (!$job)
{
print STDERR "Could not find slots in 5 attempts for $i $$\n" if ($ENV{'verbose'});
return;
}
else
{
print STDERR "Got $job\n" if ($ENV{'verbose'} > 1);
}
sleep(rand(5));
# $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
$dbh->do("update slots set usage=usage+1, job_name = NULL where job_name='$job';") || die "Cannot unlock $job";
print STDERR "Unlocked $job\n" if ($ENV{'verbose'} > 2);
$dbh->commit || die "Cannot commit";
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
$dbh->do("drop table slots;");
$dbh->commit;
$dbh->do("create table slots (id serial primary key, job_name text, usage int);") || die "Cannot create\n";
$dbh->do("insert into slots values (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0);") || die "Cannot insert";
$dbh->commit;
for(my $i=0;$i<200;$i++)
{
if (!fork)
{
worker($i);
exit(0);
}
if (++$numchild > 50)
{
sleep(1);
}
}
while (wait > 0)
{
$numchild--;
print "Waiting numchild $numchild\n";
sleep(1);
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my $slots = $dbh->selectall_arrayref("select * from slots;") || die "Cannot do final select";
my $sum=0;
foreach my $slot (@$slots)
{
printf("%02d %3d %s\n",$slot->[0], $slot->[2], $slot->[1]);
$sum += $slot->[2];
}
print "Successfully made $sum entries\n";
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With