I am wondering if it's possible to query transactions using standard query select.
For example:
SELECT * FROM information_schema.transaction_logs
WHERE table_name = 'product' AND time_stamp > '2016-01-01';
the result would be something like
===> table_name | operation | old_val_json | new_val_json...
product | update | {....desc:...} | {...desc...}|
The query does not work because there is no such table as transaction_logs, but does something similar to this exist?
You can query write-ahead log stream through a logical replication slot.
First, you need to change a couple of parameters and then restart the server for the changes to take place:
postgres=# alter system set wal_level = logical;
postgres=# alter system set max_replication_slots = 1;
Then (after restart) you need to create a slot:
postgres=# SELECT * FROM pg_create_logical_replication_slot('slot', 'test_decoding');
slot_name | xlog_position
-----------+---------------
slot | 2E/839F3300
(1 row)
Here, test_decoding
is an output plugin name which is intended to convert log records (which are binary) to some text representation.
Then let's create a table...
postgres=# create table product(id serial, val json);
CREATE TABLE
Now you can query WAL stream:
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
location | xid | data
-------------+-------+--------------
2E/83A0BA48 | 80243 | BEGIN 80243
2E/83A1D2B8 | 80243 | COMMIT 80243
(2 rows)
Unfortunately, now you can't decode DDL, so you get just BEGIN and END. Xid
field represents transaction number.
But let's insert something...
postgres=# insert into product(val) values ('{"desc":"aaa"}');
INSERT 0 1
Now query the stream again:
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
location | xid | data
-------------+-------+------------------------------------------------------------------------
2E/83A1D3C0 | 80244 | BEGIN 80244
2E/83A1D3C0 | 80244 | table public.product: INSERT: id[integer]:1 val[json]:'{"desc":"aaa"}'
2E/83A1D440 | 80244 | COMMIT 80244
(3 rows)
Here you can see the table name and inserted values.
The same for an update statement:
postgres=# update product set val = '{"desc":"bbb"}';
UPDATE 1
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
location | xid | data
-------------+-------+------------------------------------------------------------------------
2E/83A1D560 | 80245 | BEGIN 80245
2E/83A1D560 | 80245 | table public.product: UPDATE: id[integer]:1 val[json]:'{"desc":"bbb"}'
2E/83A1D5E8 | 80245 | COMMIT 80245
(3 rows)
Note that after you "consume" some changes from the stream by using pg_logical_slot_get_changes
function, you cannot query the same changes again.
Drop the slot if you need it no more:
postgres=# SELECT pg_drop_replication_slot('slot');
pg_drop_replication_slot
--------------------------
(1 row)
You can read more about logical decoding in the documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With