I am trying to implement a reliable queue with multiple writers and a multiple readers using postgres database. How to avoid missing rows when a queue reader scans a table then in-progress transactions commit after it reads.
We have a reader selecting rows in batches using a “checkpoint” time, where each batch gets the rows after the last timestamp in the previous batch, and we are missing rows. (Reason: The timestamp value is based on the time INSERT happens(00.00.00). At heavy loads, if the transaction takes longer time, it gets inserted let say 10 sec later(00.00.10), the reader will miss this row(row1) if it reads during that 10 seconds and finds a row which had its INSERT time at a later time(00.00.05) than row1. The complete description of the problem is similar to the one written in this blog. http://blog.thefourthparty.com/stopping-time-in-postgresql/)
Related prior question for context: Postgres LISTEN/NOTIFY - low latency, realtime?
Update: I had updated the question from having single reader to multiple readers. Order in which the reader reads does matter.
There are existing solutions for queuing, both in PostgreSQL, with the venerable pgq project, or dedicated message queues like RabbitMQ, Kafka, etc.
Postgres' built-in partitioning is super useful for managing time series data. By partitioning your Postgres table on a time column by range (thereby creating a time-partitioned table), you can have a table with much smaller partition tables and much smaller indexes on those partitions—instead of a single huge table.
Some of the tricks we used to speed up SELECT-s in PostgreSQL: LEFT JOIN with redundant conditions, VALUES, extended statistics, primary key type conversion, CLUSTER, pg_hint_plan + bonus. Photo by Richard Jacobs on Unsplash.
Considering multiple readers, it is necessary to have control on which records each reader had received already.
Also, it's been said the order is a condition to send records to a reader as well. So, if some further transaction had committed before a earlier one, we have to "stop" and just send records again when it had committed, to maintain the order of records sent to the reader.
That said, check the implementation:
-- lets create our queue table
drop table if exists queue_records cascade;
create table if not exists queue_records
(
cod serial primary key,
date_posted timestamp default timeofday()::timestamp,
message text
);
-- lets create a table to save "checkpoints" per reader_id
drop table if exists queue_reader_checkpoint cascade;
create table if not exists queue_reader_checkpoint
(
reader_id text primary key,
last_checkpoint numeric
);
CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
RETURNS SETOF queue_records AS
$BODY$
DECLARE
vLAST_CHECKPOINT numeric;
vCHECKPOINT_EXISTS integer;
vRECORD queue_records%rowtype;
BEGIN
-- let's get the last record sent to the reader
SELECT last_checkpoint
INTO vLAST_CHECKPOINT
FROM queue_reader_checkpoint
WHERE reader_id = pREADER_ID;
-- if vLAST_CHECKPOINT is null (this is the very first time of reader_id),
-- sets it to the last cod from queue. It means that reader will get records from now on.
if (vLAST_CHECKPOINT is null) then
-- sets the flag indicating the reader does not have any checkpoint recorded
vCHECKPOINT_EXISTS = 0;
-- gets the very last commited record
SELECT MAX(cod)
INTO vLAST_CHECKPOINT
FROM queue_records;
else
-- sets the flag indicating the reader already have a checkpoint recorded
vCHECKPOINT_EXISTS = 1;
end if;
-- now let's get the records from the queue one-by-one
FOR vRECORD IN
SELECT *
FROM queue_records
WHERE COD > vLAST_CHECKPOINT
ORDER BY COD
LOOP
-- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then
-- let's save the last record read
vLAST_CHECKPOINT = vRECORD.COD;
-- and return it
RETURN NEXT vRECORD;
-- but, if it is not, then is out of order
else
-- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
-- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
exit;
end if;
END LOOP;
-- now we have to persist the last record read to be retrieved on next call
if (vCHECKPOINT_EXISTS = 0) then
INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
else
UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
end if;
end;
$BODY$ LANGUAGE plpgsql VOLATILE;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With