Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pipes and filters at DBMS-level: Splitting the MERGE output stream

Scenario

We have a pretty standard data import process in which we load a staging table, then MERGE it into a target table.

New requirements (green) involve capturing a subset of the imported data into a separate queue table for completely unrelated processing.

Scenario schema

The "challenge"

(1) The subset consists of a selection of the records: those that were newly inserted into the target table only.

(2) The subset is a projection of some of the inserted columns, but also at least one column that is only present in the source (the staging table).

(3) The MERGE statement already uses the OUTPUT..INTO clause strictly to record the $actions taken by MERGE, so that we can PIVOT the result and COUNT the number of insertions, updates and deletions for statistics purposes. We don't really enjoy buffering the actions for the entire dataset like that and would prefer aggregating the sums on the fly. Needless to say, we don't want to add more data to this OUTPUT table.

(4) We don't want to do the matching work that the MERGE performs a second time for whatever reason, even partially. The target table is really big, we can't index everything, and the operation is generally quite expensive (minutes, not seconds).

(5) We're not considering roundtripping any output from the MERGE to the client just so that the client can route it to the queue by sending it back immediately. The data has to stay on the server.

(6) We wish to avoid buffering the entire dataset in temporary storage between staging and the queue.

What would be the best way of going about it?

Failures

(a) The requirement to enqueue only the inserted records prevents us from targeting the queue table directly in an OUTPUT..INTO clause of the MERGE, as it doesn't allow any WHERE clause. We can use some CASE trickery to mark the unwanted records for subsequent deletion from the queue without processing, but this seems crazy.

(b) Because some columns intended for the queue don't appear in the target table, we cannot simply add an insertion trigger on the target table to load the queue. The "data flow split" has to happen sooner.

(c) Since we already use an OUTPUT..INTO clause in the MERGE, we cannot add a second OUTPUT clause and nest the MERGE into an INSERT..SELECT to load the queue either. This is a shame, because it feels like a completely arbitrary limitation for something that works very well otherwise; the SELECT filters only the records with the $action we want (INSERT) and INSERTs them in the queue in a single statement. Thus, the DBMS can theoretically avoid buffering the whole dataset and simply stream it into the queue. (Note: we didn't pursue and it's likely that it actually didn't optimize the plan this way.)

Situation

We feel we've exhausted our options, but decided to turn to the hivemind to be sure. All we can come up with is:

(S1) Create a VIEW of the target table that also contains nullable columns for the data intended for the queue only, and have the SELECT statement define them as NULL. Then, setup INSTEAD OF triggers that populate both the target table and the queue appropriately. Finally, wire the MERGE to target the view. This works, but we're not fans of the construct -- it definitely looks tricky.

(S2) Give up, buffer the entire dataset in a temporary table using another MERGE..OUTPUT. After the MERGE, immediately copy the data (again!) from temporary table into the queue.

like image 919
tne Avatar asked Dec 30 '15 16:12

tne


3 Answers

My understanding is that the main obstacle is the limitation of the OUTPUT clause in SQL Server. It allows one OUTPUT INTO table and/or one OUTPUT that returns result set to the caller.

You want to save the outcome of the MERGE statement in two different ways:

  • all rows that were affected by MERGE for gathering statistics
  • only inserted rows for queue

Simple variant

I would use your S2 solution. At least to start with. It is easy to understand and maintain and should be quite efficient, because the most resource-intensive operation (MERGE into Target itself would be performed only once). There is a second variant below and it would be interesting to compare their performance on real data.

So:

  • Use OUTPUT INTO @TempTable in the MERGE
  • Either INSERT all rows from @TempTable into Stats or aggregate before inserting. If all you need is aggregated statistics, it makes sense to aggregate results of this batch and merge it into the final Stats instead of copying all rows.
  • INSERT into Queue only "inserted" rows from @TempTable.

I'll take sample data from the answer by @i-one.

Schema

-- I'll return to commented lines later

CREATE TABLE [dbo].[TestTarget](
    -- [ID] [int] IDENTITY(1,1) NOT NULL,
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStaging](
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStats](
    [MergeAction] [nvarchar](10) NOT NULL
);

CREATE TABLE [dbo].[TestQueue](
    -- [TargetID] [int] NOT NULL,
    [foo] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

Sample data

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

Merge

DECLARE @TempTable TABLE (
    MergeAction nvarchar(10) NOT NULL,
    foo varchar(10) NULL,
    baz varchar(10) NULL);

MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
    Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;

INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;

INSERT INTO [dbo].[TestQueue]
    ([foo]
    ,[baz])
SELECT
    T.foo
    ,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

Result

TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A   | AA  |
| B   | BB  |
| C   | CC  |
+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C   | CCC |
+-----+-----+

Second variant

Tested on SQL Server 2014 Express.

OUTPUT clause can send its result set to a table and to the caller. So, OUTPUT INTO can go into the Stats directly and if we wrap the MERGE statement into a stored procedure, then we can use INSERT ... EXEC into the Queue.

If you examine execution plan you'll see that INSERT ... EXEC creates a temporary table behind the scenes anyway (see also The Hidden Costs of INSERT EXEC by Adam Machanic), so I expect that overall performance would be similar to the first variant when you create temporary table explicitly.

One more problem to solve: Queue table should have only "inserted" rows, not all effected rows. To achieve that you could use a trigger on the Queue table to discard rows other than "inserted". One more possibility is to define a unique index with IGNORE_DUP_KEY = ON and prepare the data in such a way that "non-inserted" rows would violate the unique index and would not be inserted into the table.

So, I'll add an ID IDENTITY column to the Target table and I'll add a TargetID column to the Queue table. (Uncomment them in the script above). Also, I'll add an index to the Queue table:

CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
    [TargetID] ASC
) WITH (
PAD_INDEX = OFF, 
STATISTICS_NORECOMPUTE = OFF, 
SORT_IN_TEMPDB = OFF, 
IGNORE_DUP_KEY = ON, 
DROP_EXISTING = OFF, 
ONLINE = OFF, 
ALLOW_ROW_LOCKS = ON, 
ALLOW_PAGE_LOCKS = ON)

Important part is UNIQUE and IGNORE_DUP_KEY = ON.

Here is the stored procedure for the MERGE:

CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    MERGE INTO dbo.TestTarget AS Dst
    USING dbo.TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
        Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action INTO dbo.TestStats(MergeAction)
    OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID, 
    inserted.foo,
    Src.baz
    ;

END

Usage

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
    ([TargetID]
    ,[foo]
    ,[baz])
VALUES
    (0
    ,NULL
    ,NULL);

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

Result

TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
|  1 | A   | AA  |
|  2 | B   | BB  |
|  3 | C   | CC  |
+----+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+----------+------+------+
| TargetID | foo  | baz  |
+----------+------+------+
|        0 | NULL | NULL |
|        3 | C    | CCC  |
+----------+------+------+

There will be an extra message during INSERT ... EXEC:

Duplicate key was ignored.

if MERGE updated some rows. This warning message is sent when unique index discards some rows during INSERT due to IGNORE_DUP_KEY = ON.

A warning message will occur when duplicate key values are inserted into a unique index. Only the rows violating the uniqueness constraint will fail.

like image 160
Vladimir Baranov Avatar answered Oct 21 '22 08:10

Vladimir Baranov


Consider following two approaches to solve the problem:

  • Merge data into target and output inserted into queue in a single statement, and summarize statistics in the trigger created on target. Batch identifier can be passed into trigger via temporary table.
  • Merge data into target and output inserted into queue in a single statement, and summarize statistics immediately after the merge, using built-in change tracking capabilities, instead of doing it in the trigger.

Approach 1 (merge data and gather statistics in the trigger):

Sample data setup (indexes and constraints omitted for simplicity):

create table staging (foo varchar(10), bar varchar(10), baz varchar(10));
create table target (foo varchar(10), bar varchar(10));
create table queue (foo varchar(10), baz varchar(10));
create table stats (batchID int, inserted bigint, updated bigint, deleted bigint);

insert into staging values
    ('A', 'AA', 'AAA')
    ,('B', 'BB', 'BBB')
    ,('C', 'CC', 'CCC')
    ;

insert into target values
    ('A', 'A_')
    ,('B', 'B?')
    ,('E', 'EE')
    ;

Trigger for gathering inserted/updated/deleted statistics:

create trigger target_onChange
on target
after delete, update, insert
as
begin
    set nocount on;

    if object_id('tempdb..#targetMergeBatch') is NULL
        return;

    declare @batchID int;
    select @batchID = batchID from #targetMergeBatch;

    merge into stats t
    using (
        select
            batchID = @batchID,
            cntIns = count_big(case when i.foo is not NULL and d.foo is NULL then 1 end),
            cntUpd = count_big(case when i.foo is not NULL and d.foo is not NULL then 1 end),
            cntDel = count_big(case when i.foo is NULL and d.foo is not NULL then 1 end)
        from inserted i
            full join deleted d on d.foo = i.foo
    ) s
    on t.batchID = s.batchID
    when matched then
        update
        set
            t.inserted = t.inserted + s.cntIns,
            t.updated = t.updated + s.cntUpd,
            t.deleted = t.deleted + s.cntDel
    when not matched then
        insert (batchID, inserted, updated, deleted)
        values (s.batchID, s.cntIns, s.cntUpd, cntDel);

end

Merge statements:

declare @batchID int;
set @batchID = 1;-- or select @batchID = batchID from ...;

create table #targetMergeBatch (batchID int);
insert into #targetMergeBatch (batchID) values (@batchID);

insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

drop table #targetMergeBatch

Check the results:

select * from target;
select * from queue;
select * from stats;

Target:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

Queue:

foo        baz
---------- ----------
C          CCC

Stats:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1

Approach 2 (gather statistics, using change tracking capabilities):

Sample data setup is the same as in previous case (just drop everything incl. trigger and recreate tables from scratch), except that in this case we need to have PK on target to make sample work:

create table target (foo varchar(10) primary key, bar varchar(10));

Enable change tracking on database:

alter database Test
    set change_tracking = on

Enable change tracking on target table:

alter table target
    enable change_tracking

Merge data and grab statistics immediately after that, filtering by the change context to count only rows affected by merge:

begin transaction;
declare @batchID int, @chVersion bigint, @chContext varbinary(128);
set @batchID = 1;-- or select @batchID = batchID from ...;
SET @chVersion = change_tracking_current_version();
set @chContext = newid();

with change_tracking_context(@chContext)
insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

with ch(foo, op) as (
    select foo, sys_change_operation
    from changetable(changes target, @chVersion) ct
    where sys_change_context = @chContext
)
insert into stats (batchID, inserted, updated, deleted)
select @batchID, [I], [U], [D]
from ch
    pivot(count_big(foo) for op in ([I], [U], [D])) pvt
    ;

commit transaction;

Check the results:

select * from target;
select * from queue;
select * from stats;

They are same as in previous sample.

Target:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

Queue:

foo        baz
---------- ----------
C          CCC

Stats:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1
like image 24
i-one Avatar answered Oct 21 '22 08:10

i-one


I suggest extracting the stats be coding using three independent AFTER INSERT / DELETE / UPDATE triggers along the lines of:

create trigger dbo.insert_trigger_target
on [dbo].[target]
after insert
as
insert into dbo.[stats] ([action],[count])
select 'insert', count(1)
from inserted;
go

create trigger dbo.update_trigger_target
on [dbo].[target]
after update
as
insert into dbo.[stats] ([action],[count])
select 'update', count(1) from inserted -- or deleted == after / before image, count will be the same
go

create trigger dbo.delete_trigger_target
on [dbo].[target]
after delete
as
insert into dbo.[stats] ([action],[count])
select 'delete', count(1) from deleted
go

If you need more context, put something in CONTEXT_INFO and pluck it out from the triggers.

Now, I'm going to assert that the AFTER triggers are not that expensive, but you'll need to test that to be sure.

Having dealt with that, you'll be free to use the OUTPUT clause (NOT OUTPUT INTO) in the MERGE and then use that nested inside a select to subset the data that you want to go into the queue table.

Justification

Because of the need to access columns from both staging and target in order to build the data for queue, this HAS to be done using the OUTPUT option in MERGE, since nothing else has access to "both sides".

Then, if we have hijacked the OUTPUT clause for queue, how can we re-work that functionality? I think the AFTER triggers will work, given the requirements for stats that you have described. Indeed, the stats could be quite complex if required, given the images that are available. I'm asserting that the AFTER triggers are "not that expensive" since the data of both before and after must always be available in order that a transaction can be both COMMITTED OR ROLLED BACK - yes, the data needs to be scanned (even to get the count) but that doesn't seem like too much of a cost.

In my own analysis that scan added about 5% to the execution plan's base cost

Sound like a solution?

like image 5
dsz Avatar answered Oct 21 '22 10:10

dsz