I want to use a table in Postgres database as a storage for input documents (there will be billions of them). Documents are being continuously added (using "UPSERT" logic to avoid duplicates), and rarely are removed from the table.
There will be multiple worker apps that should continuously read data from this table, from the first inserted row to the latest, and then poll new rows as they being inserted, reading each row exactly once. Also, when worker's processing algorithm changes, all the data should be reread from the first row. Each app should be able to maintain its own row processing progress, independent of other apps.
I'm looking for a way to track last processed row, to be able to pause and continue polling at any moment.
I can think of these options:
And then store the autoincrement field value of the last processed row somewhere, to use it in a next query like this:
SELECT * FROM document WHERE id > :last_processed_id LIMIT 100;
But after some research I found that in a concurrent environment, it is possible that rows with lower autoincrement values will become visible to clients LATER than rows with higher values, so some rows could be skipped.
The problem with this option is timestamps are not unique and could overlap during high insertion rate, what, once again, leads to skipped rows. Also, adjusting system time (manually or by NTP) may lead to unpredicted results.
This is the only actually reliable way to do this I could think of, but there are drawbacks to it, including the need to update each row after it was processed and extra storage needed to store the completion flag field for each app, and running a new app may require DB schema change. This is the last resort for me, I'd like to avoid it if there are more elegant ways to do this.
I know, the task definition screams I should use Kafka for this, but the problem with it is it doesn't allow to delete single messages from a topic, and I need this functionality. Keeping an external list of Kafka records that should be skipped during processing feels very clumsy and inefficient to me. Also, a real-time deduplication with Kafka would also require some external storage.
I'd like to know if there are other, more efficient approaches to this problem using the Postgres DB.
I ended up saving the transaction id for each row and then selecting records that have txid value lower than the transaction with smallest id to the moment like this:
SELECT * FROM document
WHERE ((txid = :last_processed_txid AND id > :last_processed_id) OR txid > :last_processed_txid)
AND txid < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY txid, id
LIMIT 100
This way, even if Transaction #2, that was started after Transaction #1, completes faster than the first one, the rows it written won't be read by a consumer until the Transaction #1 finishes.
Postgres docs state that
xid8 values increase strictly monotonically and cannot be reused in the lifetime of a database cluster
so it should fit my case.
This solution is not that space-efficient, because an extra 8-byte txid field must be saved with each row, and an index for the txid field should be created, but the main benefits over other methods here are:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With