Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I efficiently handle multiple inserts into an array?

I have a dynamically allocated array of integers into which I want to insert integers at arbitrary positions. Many integers as in more than 2.5 million.

My code currently looks like this:

type
  TIntegerArr = array of Integer;

var
  FCount: Integer;
  FSortedList: TIntegerArr;

procedure Insert(_Value: Integer; _InsertPos: integer);
var
  OldList: TIntegerArr;
begin
  OldList := FSortedList;
  if Length(FSortedList) < FCount + 1 then begin
    OldList := FSortedList;
    FSortedList := nil;
    SetLength(FSortedList, FCount + 100);
    CopyMemory(@FSortedList[0], @OldList[0], SizeOf(Integer) * _InsertPos);
  end;
  MoveMemory(@FSortedList[_InsertPos + 1], @OldList[_InsertPos], SizeOf(Integer) * (FCount - _InsertPos));
  FSortedList[_InsertPos] := _Value;
  Inc(FCount);
end;

(The real code is a method of a class that has FSortedList and FCount as fields.)

Using a temporary list and using Move rather than a for loop for moving the data improved the performance already quite a lot because it prevents the array from being copied twice when it has to grow (once in SetLength on the existing array and another time with Move).

But the worst case Insert(SomeValue, 0) still always moves all existing values.

So far I was thinking along the lines of introducing an offset at the start of the array so rather than having to move all existing values every time a new value is inserted at the front, I could do that only when the offset reaches 0. E.g.:

// simple case: inserting at Position 0:
if FOffset = 0 then begin
  // [...] reallocate a new array as above
  Move(@FSortedList[100], @OldList, SizeOf(Integer) * _InsertPos);
  FOffset := 100;
end;
Dec(FOffset);
FSortedList[FOffset] := _NewValue;

(This code is untested and probably buggy) This of course can be extended to check whether the insertion point is nearer to the beginning or the end and depending on that move either the first or the last values by one position so that on average only 1/4 of the entries has to be moved rather than 1/2 as it currently is.

Another option would be implementing a sparse array. I remember seeing such an implementation in some commercial library back in the 1990ies but don't remember which it was (TurboPower?).

This procedure is central to some sorting and indexing code which works on arrays of different sizes, from just a few dozen entries up to the above mentioned millions of entries.

Currently the program runs about 2 hours (before my optimizations it was close to 5 hours) and I already know that the number of entries in the array is going to at least double. As insert performance gets worse the larger the array already is, I suspect that with double the number of entries, the run time will at least quadruple.

I would like some suggestions on how to tune the performance. Memory consumption is currently not much of an issue but run time definitely is.

(This is Delphi 2007 but that should not make much of a difference unless newer Delphi versions already have an optimized library for doing the above. Classes.TList is not optimized.)

Edit1: Just found the sparse array implementation I mentioned above: It's StColl from TurboPower SysTools.

Edit2: Ok, some background: My program reads a DBase table with currently 2.4 million entries and generates several new tables from these entries. The new tables are normalized and are indexed after they have been created (For performance reasons I don't generate the indexes before inserting the data, trust me, I tried it first.). The array is the central piece of code that provides internal sorting for the generated tables. New records are only appended to the table, but their RecNo is inserted into the array in sorted order.

like image 953
dummzeuch Avatar asked Sep 30 '13 15:09

dummzeuch


2 Answers

Immediately after looking at your procedure I noticed some flaws. To see progress i first measured speed of your existing procedure in worst case scenario (adding number allways on the 0 position).

n:=500000;
for i:=0 to n-1
 do Insert(i, 0);

Measurement: n=500000 47.6 ms

A) Simplicity

I deleted some unnecessary lines from your procedure (OldList is totally unnecessary, SetLength preserves memory).

Improvement A:

procedure Insert(_Value: Integer; _InsertPos: integer);
begin
 if Length(FSortedList) < FCount + 1
    then SetLength(FSortedList, FCount + 100);
  Move(FSortedList[_InsertPos], FSortedList[_InsertPos+1], SizeOf(Integer) * (FCount - _InsertPos));
  FSortedList[_InsertPos] := _Value;
  Inc(FCount);
end;

Speed gain 6% (44.8 ms)

B) Everything counts

if Length(FSortedList) < FCount + 1 
   then SetLength(FSortedList, FCount + 100);
  • Tip 1: function Length is called on every insert
  • Tip 2: FCount+1 is calculated every time
  • Tip 3: procedure parameters as const (by reference)
  • Tip 4: introduce FCapacity variable
  • Tip 5: increasing length by just 100 will cause a lot of reallocations (25.000 on 2.5 million array). As you say, memory is not the problem, so why not just preallocate all or at least large?

Improvement B:

procedure Insert(const _Value, _InsertPos: integer);
begin
 if FCount = FCapacity
    then begin
     Inc(FCapacity, 100000);
     SetLength(FSortedList, FCapacity);
    end;
 Move(FSortedList[_InsertPos], FSortedList[_InsertPos+1], SizeOf(Integer) * (FCount - _InsertPos));
 FSortedList[_InsertPos] := _Value;
 Inc(FCount);
end;

Speed gain 1% (44.3 ms).

Hint: Instead of Inc by 100000 you can implement some progressive algorithm.

C) Bottleneck

If we look at the procedure now, there is nothing left, just a lot of memory moves. If we can't change the algorithm, we must improve the memory move.

There was actually fastmove challenge (fastcode.sourceforge.net)

I prepared a zip, with just those files you need (3 files, source code). Link >>> http://www.dakte.org/_stackoverflow/files/delphi-fastcode.zip

  • Add fastcodeCPUID.pas and fastmove.pas to your project!
  • Insert Uses fastmove.pas;
  • Voila! Nothing else to change!

Speed gain on my machine almost 50% (depends on CPU you are using).

Original procedure

n         ms graph
---------------------------------
100000   1.8 *
200000   7.6 ***
300000  17.0 *******
400000  30.1 *************
500000  47.6 ********************

Improved, without fastmove (-7%)

n         ms graph
---------------------------------
100000   1.6 *
200000   6.9 ***
300000  15.7 ******
400000  28.2 ***********
500000  44.3 ******************

Improved, With fastmove (-46%)

n         ms graph
---------------------------------
100000   0.8 *
200000   3.8 **
300000   9.0 ****
400000  16.3 *******
500000  25.7 ***********

Last comments:

 if FCount = FCapacity
    then begin
     if FCapacity<100000
        then FCapacity:=100000  
        else FCapacity:=FCapacity*2;
     SetLength(FSortedList, FCapacity);
    end;

As I said you can add some progressive FCapacity increase. This is some classical Grow implementation (just add more if's if needed or change 100000 to more apropriate value).

D) Update 2: Array as ^TArray

type
  PIntegerArr3 = ^TIntegerArr3y;
  TIntegerArr3y = array[0..1] of Integer;

var
 FCapacity3,
 FCount3: Integer;
 FSortedList3: PIntegerArr3;

procedure ResizeArr3(var aCurrentArr: PIntegerArr3; const aNewCapacity: Integer);
var lNewArr: PIntegerArr3;

begin
 GetMem(lNewArr, aNewCapacity*SizeOf(Integer));

 if FCount3>0 // copy data too
  then begin
    if aNewCapacity<FCount3
       then FCount3:=aNewCapacity; // shrink
    Move(aCurrentArr^[0], lNewArr^[0], FCount3*SizeOf(Integer));
  end;

 FreeMem(aCurrentArr, FCapacity3*SizeOf(Integer));
 FCapacity3:=aNewCapacity;
 aCurrentArr:=lNewArr;
end;

procedure FreeArr3;
begin
 if FCapacity3>0
  then begin
    FreeMem(FSortedList3, FCapacity3*SizeOf(Integer));
    FSortedList3:=nil;
  end;
end;

procedure Insert3(const _Value, _InsertPos: integer);
begin
 if FCount3 = FCapacity3
    then ResizeArr3(FSortedList3, FCapacity3 + 100000);
 Move(FSortedList3^[_InsertPos], FSortedList3^[_InsertPos+1], SizeOf(Integer) * (FCount3 - _InsertPos));
 FSortedList3^[_InsertPos] := _Value;
 Inc(FCount3);
end;

Speed gain from step C) None!

Conslusion: FastMove or Algorithm change, "physical" limit of memory moving speed is reached!

I'm using Delphi XE3 and in System.pas, line 5307:

(* ***** BEGIN LICENSE BLOCK *****
 *
 * The assembly function Move is licensed under the CodeGear license terms.
 *
 * The initial developer of the original code is Fastcode
 *
 * Portions created by the initial developer are Copyright (C) 2002-2004
 * the initial developer. All Rights Reserved.
 *
 * Contributor(s): John O'Harrow
 *
 * ***** END LICENSE BLOCK ***** *)

procedure Move(const Source; var Dest; Count: NativeInt);

So actually in Delphi are allready some Fastcode routines, but including those downloaded directly from their site (or from link i included above) made the biggest diferrence, almost 50% (strange).

like image 120
david Avatar answered Sep 29 '22 10:09

david


Not to be a spoilsport, but the solution is already in the edit to my question:

After switching from an array to TurboPower's StColl the performance no longer degrades with large arrays and is quite fast to boot. The run time is down from 2 hours to less than 1/2 hour. The change was really simple. I wish I had remembered that library much earlier.

I needed the following files from the SourceForge repository (I didn't want to download the whole library):

  • StBase.pas
  • StColl.pas
  • StConst.pas
  • StList.pas
  • StDefine.inc

Actually I am surprised that there weren't more interdependencies. The TurboPower guys definitely knew their trade. I wonder what they are doing today, still programming gambling machines for casinos?

like image 26
dummzeuch Avatar answered Sep 29 '22 10:09

dummzeuch