Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fast intersection of two sorted integer arrays

Tags:

c#

.net

assembly

I need to find the intersection of two sorted integer arrays and do it very fast.

Right now, I am using the following code:

int i = 0, j = 0;

while (i < arr1.Count && j < arr2.Count)
{
    if (arr1[i] < arr2[j])
    {
        i++;
    }
    else
    {
        if (arr2[j] < arr1[i])
        {
            j++;
        }
        else 
        {
            intersect.Add(arr2[j]);
            j++;
            i++;
        }
    }
}

Unfortunately it might to take hours to do all work.

How to do it faster? I found this article where SIMD instructions are used. Is it possible to use SIMD in .NET?

What do you think about:

http://docs.go-mono.com/index.aspx?link=N:Mono.Simd Mono.SIMD

http://netasm.codeplex.com/ NetASM(inject asm code to managed)

and something like http://www.atrevido.net/blog/PermaLink.aspx?guid=ac03f447-d487-45a6-8119-dc4fa1e932e1

 

EDIT:

When i say thousands i mean following (in code)

for(var i=0;i<arrCollection1.Count-1;i++)
{
    for(var j=i+1;j<arrCollection2.Count;j++)
    {
        Intersect(arrCollection1[i],arrCollection2[j])  
    }
}
like image 755
Neir0 Avatar asked Jun 02 '12 23:06

Neir0


1 Answers

UPDATE

The fastest I got was 200ms with arrays size 10mil, with the unsafe version (Last piece of code).

The test I've did:

var arr1 = new int[10000000];
var arr2 = new int[10000000];

for (var i = 0; i < 10000000; i++)
{
    arr1[i] = i;
    arr2[i] = i * 2;
}

var sw = Stopwatch.StartNew();

var result = arr1.IntersectSorted(arr2);

sw.Stop();

Console.WriteLine(sw.Elapsed); // 00:00:00.1926156

Full Post:

I've tested various ways to do it and found this to be very good:

public static List<int> IntersectSorted(this int[] source, int[] target)
{
    // Set initial capacity to a "full-intersection" size
    // This prevents multiple re-allocations
    var ints = new List<int>(Math.Min(source.Length, target.Length));

    var i = 0;
    var j = 0;

    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;

                // Saves us a JMP instruction
                continue;
            case 1:
                j++;

                // Saves us a JMP instruction
                continue;
            default:
                ints.Add(source[i++]);
                j++;

                // Saves us a JMP instruction
                continue;
        }
    }

    // Free unused memory (sets capacity to actual count)
    ints.TrimExcess();

    return ints;
}

For further improvement you can remove the ints.TrimExcess();, which will also make a nice difference, but you should think if you're going to need that memory.

Also, if you know that you might break loops that use the intersections, and you don't have to have the results as an array/list, you should change the implementation to an iterator:

public static IEnumerable<int> IntersectSorted(this int[] source, int[] target)
{
    var i = 0;
    var j = 0;

    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;

                // Saves us a JMP instruction
                continue;
            case 1:
                j++;

                // Saves us a JMP instruction
                continue;
            default:
                yield return source[i++];
                j++;

                // Saves us a JMP instruction
                continue;
        }
    }
}

Another improvement is to use unsafe code:

public static unsafe List<int> IntersectSorted(this int[] source, int[] target)
{
    var ints = new List<int>(Math.Min(source.Length, target.Length));

    fixed (int* ptSrc = source)
    {
        var maxSrcAdr = ptSrc + source.Length;

        fixed (int* ptTar = target)
        {
            var maxTarAdr = ptTar + target.Length;

            var currSrc = ptSrc;
            var currTar = ptTar;

            while (currSrc < maxSrcAdr && currTar < maxTarAdr)
            {
                switch ((*currSrc).CompareTo(*currTar))
                {
                    case -1:
                        currSrc++;
                        continue;
                    case 1:
                        currTar++;
                        continue;
                    default:
                        ints.Add(*currSrc);
                        currSrc++;
                        currTar++;
                        continue;
                }
            }
        }
    }

    ints.TrimExcess();
    return ints;
}

In summary, the most major performance hit was in the if-else's. Turning it into a switch-case made a huge difference (about 2 times faster).

like image 164
SimpleVar Avatar answered Oct 17 '22 13:10

SimpleVar