Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reliable way to poll data from a Postgres table

I want to use a table in Postgres database as a storage for input documents (there will be billions of them). Documents are being continuously added (using "UPSERT" logic to avoid duplicates), and rarely are removed from the table.

There will be multiple worker apps that should continuously read data from this table, from the first inserted row to the latest, and then poll new rows as they being inserted, reading each row exactly once. Also, when worker's processing algorithm changes, all the data should be reread from the first row. Each app should be able to maintain its own row processing progress, independent of other apps.

I'm looking for a way to track last processed row, to be able to pause and continue polling at any moment.

I can think of these options:

Using an autoincrement field

And then store the autoincrement field value of the last processed row somewhere, to use it in a next query like this:

SELECT * FROM document WHERE id > :last_processed_id LIMIT 100;

But after some research I found that in a concurrent environment, it is possible that rows with lower autoincrement values will become visible to clients LATER than rows with higher values, so some rows could be skipped.

Using a timestamp field

The problem with this option is timestamps are not unique and could overlap during high insertion rate, what, once again, leads to skipped rows. Also, adjusting system time (manually or by NTP) may lead to unpredicted results.

Add a process completion flag to each row

This is the only actually reliable way to do this I could think of, but there are drawbacks to it, including the need to update each row after it was processed and extra storage needed to store the completion flag field for each app, and running a new app may require DB schema change. This is the last resort for me, I'd like to avoid it if there are more elegant ways to do this.

I know, the task definition screams I should use Kafka for this, but the problem with it is it doesn't allow to delete single messages from a topic, and I need this functionality. Keeping an external list of Kafka records that should be skipped during processing feels very clumsy and inefficient to me. Also, a real-time deduplication with Kafka would also require some external storage.

I'd like to know if there are other, more efficient approaches to this problem using the Postgres DB.

like image 472
Ultranium Avatar asked Sep 16 '25 04:09

Ultranium


1 Answers

I ended up saving the transaction id for each row and then selecting records that have txid value lower than the transaction with smallest id to the moment like this:

SELECT * FROM document
WHERE ((txid = :last_processed_txid AND id > :last_processed_id) OR txid > :last_processed_txid) 
  AND txid < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY txid, id
LIMIT 100

This way, even if Transaction #2, that was started after Transaction #1, completes faster than the first one, the rows it written won't be read by a consumer until the Transaction #1 finishes.

Postgres docs state that

xid8 values increase strictly monotonically and cannot be reused in the lifetime of a database cluster

so it should fit my case.

This solution is not that space-efficient, because an extra 8-byte txid field must be saved with each row, and an index for the txid field should be created, but the main benefits over other methods here are:

  • DB schema remains the same in case of adding new consumers
  • No updates needed to mark row as processed, a consumer only should keep id and txid values of the last processed row
  • System clock drift or adjustment won't lead to rows being skipped
  • Having the txid for each row helps to query data in insertion order in cases when multiple producers insert rows with id, generated using preallocated pool (for example, Producer 1 in the moment inserts rows with ids in 1..100, Producer 2 - 101..200 and so on)
like image 199
Ultranium Avatar answered Sep 17 '25 20:09

Ultranium