Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

postgresql insert rules for parallel transactions

We have a postgreql connection pool used by multithreaded application, that permanently inserts some records into big table. So, lets say we have 10 database connections, executing the same function, whcih inserts the record.

The trouble is, we have 10 records inserted as a result meanwhile it should be only 2-3 records inserted, if only transactions could see the records of each other (our function takes decision to do not insert the record according to the date of the last record found).

We can not afford table locking for func execution period. We tried different tecniques to make the database apply our rules to new records immediately despite the fact they are created in parallel transactions, but havent succeeded yet.

So, I would be very grateful for any help or idea!

To be more specific, here is the code:

schm.events ( evtime TIMESTAMP, ref_id INTEGER, param INTEGER, type INTEGER);

record filter rule:

BEGIN
select count(*) into nCnt
from events e
where e.ref_id = ref_id and e.param = param and e.type = type 
and e.evtime between (evtime - interval '10 seconds') and (evtime + interval '10 seconds')

if nCnt = 0 then 
  insert into schm.events values (evtime, ref_id, param, type);
end if;
END;

UPDATE (comment length is not enough unfortunately)

I've applied to production the unique index solution. The results are pretty acceptable, but the initial target has not been achieved. The issue is, with the unique hash I can not control the interval between 2 records with sequential hash_codes.

Here is the code:

CREATE TABLE schm.events_hash (
  hash_code bigint NOT NULL
);
CREATE UNIQUE INDEX ui_events_hash_hash_code ON its.events_hash
  USING btree (hash_code);


--generate the hash codes data by partioning(splitting) evtime in 10 sec intervals:
INSERT into schm.events_hash 
select distinct ( cast( trunc( extract(epoch from evtime) / 10 ) || cast( ref_id as TEXT) || cast( type as TEXT ) || cast( param as TEXT ) as bigint) )
from schm.events;

--and then in a concurrently executed function I insert sequentially:
begin
INSERT into schm.events_hash values ( cast( trunc( extract(epoch from evtime) / 10 ) || cast( ref_id as TEXT) || cast( type as TEXT ) || cast( param as TEXT ) as bigint) );
insert into schm.events values (evtime, ref_id, param, type);
end;

In that case, if evtime lies within hash-determined interval, only one record is being inserted. The case is, we can skip records that refer to different determined intervals, but are close to each other (less than 60 sec interval).

insert into schm.events values ( '2013-07-22 19:32:37', '123', '10', '20' ); --inserted, test ok, (trunc( extract(epoch from cast('2013-07-22 19:32:37' as timestamp)) / 10 ) = 137450715 )
insert into schm.events values ( '2013-07-22 19:32:39', '123', '10', '20' ); --filtered out, test ok, (trunc( extract(epoch from cast('2013-07-22 19:32:39' as timestamp)) / 10 ) = 137450715 )
insert into schm.events values ( '2013-07-22 19:32:41', '123', '10', '20' ); --inserted, test fail, (trunc( extract(epoch from cast('2013-07-22 19:32:41' as timestamp)) / 10 ) = 137450716 )

I think there must be a way to modify the hash function to achieve the initial target, but havent found it yet. Maybe, there are some table constraint expressions, that are executed by the postgresql itself, out of the transaction?

like image 397
xacinay Avatar asked Feb 06 '26 17:02

xacinay


1 Answers

About your only options are:

  • Using a unique index with a hack to collapse 20-second ranges to a single value;

  • Using advisory locking to control communication; or

  • SERIALIZABLE isolation and intentionally creating a mutual dependency between sessions. Not 100% sure this will be practical in your case.

What you really want is a dirty read, but PostgreSQL does not support dirty reads, so you're kind of stuck there.

You might land up needing a co-ordinator outside the database to manage your requirements.

Unique index

You can truncate your timestamps for the purpose of uniquenes checking, rounding them to regular boundaries so they jump in 20 second chunks. Then add them to a unique index on (chunk_time_seconds(evtime, 20), ref_id, param, type) .

Only one insert will succeed and the rest will fail with an error. You can trap the error in a BEGIN ... EXCEPTION block in PL/PgSQL, or preferably just handle it in the application.

I think a reasonable definition of chunk_time_seconds might be:

CREATE OR REPLACE FUNCTION chunk_time_seconds(t timestamptz, round_seconds integer)
RETURNS bigint
AS $$
SELECT floor(extract(epoch from t) / 20) * 20;
$$ LANGUAGE sql IMMUTABLE;

A starting point for advisory locking:

Advisory locks can be taken on a single bigint or a pair of 32-bit integers. Your key is bigger than that, it's three integers, so you can't directly use the simplest approach of:

IF pg_try_advisory_lock(ref_id, param) THEN
   ... do insert ...
END IF;

then after 10 seconds, on the same connection (but not necessarily in the same transaction) issue pg_advisory_unlock(ref_id_param).

It won't work because you must also filter on type and there's no three-integer-argument form of pg_advisory_lock. If you can turn param and type into smallints you could:

IF pg_try_advisory_lock(ref_id, param << 16 + type) THEN

but otherwise you're in a bit of a pickle. You could hash the values, of course, but then you run the (small) risk of incorrectly skipping an insert that should not be skipped in the case of a hash collision. There's no way to trigger a recheck because the conflicting rows aren't visible, so you can't use the usual solution of just comparing rows.

So ... if you can fit the key into 64 bits and your application can deal with the need to hold the lock for 10-20s before releasing it in the same connection, advisory locks will work for you and will be very low overhead.

like image 73
Craig Ringer Avatar answered Feb 09 '26 10:02

Craig Ringer