Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multi-threading on a foreach loop?

I want to process some data. I have about 25k items in a Dictionary. IN a foreach loop, I query a database to get results on that item. They're added as value to the Dictionary.

foreach (KeyValuePair<string, Type> pair in allPeople)
{
    MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);
    MySqlDataReader reader2 = comd.ExecuteReader();
    Dictionary<string, Dictionary<int, Log>> allViews = new Dictionary<string, Dictionary<int, Log>>();
    while (reader2.Read())
    {
        if (!allViews.ContainsKey(reader2.GetString("src")))
        {
            allViews.Add(reader2.GetString("src"), reader2.GetInt32("time"));
        }
    }
    reader2.Close();
    reader2.Dispose();
    allPeople[pair.Key].View = allViews;
}

I was hoping to be able to do this faster by multi-threading. I have 8 threads available, and CPU usage is about 13%. I just don't know if it will work because it's relying on the MySQL server. On the other hand, maybe 8 threads would open 8 DB connections, and so be faster.

Anyway, if multi-threading would help in my case, how? o.O I've never worked with (multiple) threads, so any help would be great :D

like image 571
lordstyx Avatar asked Dec 28 '22 11:12

lordstyx


2 Answers

MySqlDataReader is stateful - you call Read() on it and it moves to the next row, so each thread needs their own reader, and you need to concoct a query so they get different values. That might not be too hard, as you naturally have many queries with different values of pair.Key.

You also need to either have a temp dictionary per thread, and then merge them, or use a lock to prevent concurrent modification of the dictionary.

The above assumes that MySQL will allow a single connection to perform concurrent queries; otherwise you may need multiple connections too.

First though, I'd see what happens if you only ask the database for the data you need ("SELECT src,time FROMlogsWHERE IP = '" + pair.Key + "' GROUP BY src") and use GetString(0) and GetInt32(1) instead of using the names to look up the src and time; also only get the values once from the result.

I'm also not sure on the logic - you are not ordering the log events by time, so which one is the first returned (and so is stored in the dictionary) could be any of them.

Something like this logic - where each of N threads only operates on the Nth pair, each thread has its own reader, and nothing actually changes allPeople, only the properties of the values in allPeople:

    private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount)
    {
        int hoppity = 0; // used to hop over the keys not processed by this thread

        foreach (var pair in allPeople)
        {
            // each of the (threadCount) threads only processes the (threadCount)th key
            if ((hoppity % threadCount) == threadNumber)
            {
                // you may need con per thread, or it might be that you can share con; I don't know
                MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);

                using (MySqlDataReader reader = comd.ExecuteReader())
                {
                    var allViews = new Dictionary<string, Dictionary<int, Log>>();

                    while (reader.Read())
                    {
                        string src = reader.GetString(0);
                        int time = reader.GetInt32(1);

                        // do whatever to allViews with src and time
                    }

                    // no thread will be modifying the same pair.Value, so this is safe
                    pair.Value.View = allViews;
                }
            }

            ++hoppity;
        }
    }

This isn't tested - I don't have MySQL on this machine, nor do I have your database and the other types you're using. It's also rather procedural (kind of how you would do it in Fortran with OpenMPI) rather than wrapping everything up in task objects.

You could launch threads for this like so:

    void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        lock (allPeople)
        {
            const int threadCount = 8; // the number of threads

            // if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create
            // the threads here as any saving of using a pool will not matter against 18 seconds
            //
            // it could be more efficient to use a pool so that each thread takes a pair off of 
            // a queue, as doing it this way means that each thread has the same number of pairs to process,
            // and some pairs might take longer than others
            Thread[] threads = new Thread[threadCount];

            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount)));
                threads[threadNumber].Start();
            }

            // wait for all threads to finish
            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber].Join();
            }
        }
    }

The extra lock held on allPeople is done so that there is a write barrier after all the threads return; I'm not quite sure if it's needed. Any object would do.

Nothing in this guarantees any performance gain - it might be that the MySQL libraries are single threaded, but the server certainly can handle multiple connections. Measure with various numbers of threads.


If you're using .net 4, then you don't have to mess around creating the threads or skipping the items you aren't working on:

    // this time using .net 4 parallel; assumes that connection is thread safe
    static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection));
    }

    private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection)
    {
        MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection);

        using (MySqlDataReader reader = comd.ExecuteReader())
        {
            var allViews = new Dictionary<string, Dictionary<int, Log>>();

            while (reader.Read())
            {
                string src = reader.GetString(0);
                int time = reader.GetInt32(1);

                // do whatever to allViews with src and time
            }

            // no iteration will be modifying the same pair.Value, so this is safe
            pair.Value.View = allViews;
        }
    }
like image 193
Pete Kirkham Avatar answered Jan 12 '23 22:01

Pete Kirkham


The biggest problem that comes to mind is that you are going to use multithreading to add values to a dictionary, which isn't thread safe.

You'll have to do something like this to make it work, and you might not get that much of a benefit from implementing it this was as it still has to lock the dictionary object to add a value.

like image 26
kemiller2002 Avatar answered Jan 12 '23 23:01

kemiller2002