Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multithreading task to process files in c#

I've been reading a lot about threading but can't figure out how to find a solution to my issue. First let me introduce the problem. I have files which need to be processed. The hostname and filepath are located in two arrays.

enter image description here
Now I want to setup several threads to process the files. The number of threads to create is based on three factors:
A) The maximum thread count cannot exceed the number of unique hostnames in all scenarios.
B) Files with the same hostname MUST be processed sequentially. I.E We cannot process host1_file1 and host1_file2 at the same time. (Data integrity will be put at risk and this is beyond my control.
C) The user may throttle the number of threads available for processing. The number of threads is still limited by condition A from above. This is purely due to the fact that if we had an large number of hosts let's say 50.. we might not want 50 threads processing at the same time.

In the example above a maximum of 6 threads can be created.

The optimal processing routine is shown below.

Optimal processing routineRoutine


public class file_prep_obj
{
    public string[] file_paths;
    public string[] hostname;
    public Dictionary<string, int> my_dictionary;

    public void get_files()
    {
        hostname = new string[]{ "host1", "host1", "host1", "host2", "host2", "host3", "host4","host4","host5","host6" };
        file_paths=new string[]{"C:\\host1_file1","C:\\host1_file2","C:\\host1_file3","C:\\host2_file1","C:\\host2_file2","C:\\host2_file2",
                                "C:\\host3_file1","C:\\host4_file1","C:\\host4_file2","C:\\host5_file1","C:\\host6_file1"};
        //The dictionary provides a count on the number of files that need to be processed for a particular host.
        my_dictionary = hostname.GroupBy(x => x)
                        .ToDictionary(g => g.Key,
                        g => g.Count());
    }
}

//This class contains a list of file_paths associated with the same host.
//The group_file_host_name will be the same for a host.
class host_file_thread
{
    public string[] group_file_paths;
    public string[] group_file_host_name;

    public void process_file(string file_path_in)
    {
        var time_delay_random=new Random();
        Console.WriteLine("Started processing File: " + file_path_in);
        Task.Delay(time_delay_random.Next(3000)+1000);
        Console.WriteLine("Completed processing File: " + file_path_in);
    }
}

class Program
{
    static void Main(string[] args)
    {
        file_prep_obj my_files=new file_prep_obj();
        my_files.get_files();
        //Create our host objects... my_files.my_dictionary.Count represents the max number of threads
        host_file_thread[] host_thread=new host_file_thread[my_files.my_dictionary.Count];

        int key_pair_count=0;
        int file_path_position=0;
        foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)
        {
            host_thread[key_pair_count] = new host_file_thread();   //Initialise the host_file_thread object. Because we have an array of a customised object
            host_thread[key_pair_count].group_file_paths=new string[pair.Value];        //Initialise the group_file_paths
            host_thread[key_pair_count].group_file_host_name=new string[pair.Value];    //Initialise the group_file_host_name


            for(int j=0;j<pair.Value;j++)
            {
                host_thread[key_pair_count].group_file_host_name[j]=pair.Key.ToString();                        //Group the hosts
                host_thread[key_pair_count].group_file_paths[j]=my_files.file_paths[file_path_position];        //Group the file_paths
                file_path_position++;
            }
            key_pair_count++;
        }//Close foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)

        //TODO PROCESS FILES USING host_thread objects. 
    }//Close static void Main(string[] args)
}//Close Class Program



I guess what I'm after is a guide on how to code the threaded processing routines that are in accordance with the specs above.

like image 882
Peter H Avatar asked Jul 08 '14 05:07

Peter H


People also ask

How does multithreading work in C?

A multithreaded program contains two or more parts that can run concurrently. Each part of such a program is called a thread, and each thread defines a separate path of execution. C does not contain any built-in support for multithreaded applications.

Can we do multithreading in C?

Can we write multithreading programs in C? Unlike Java, multithreading is not supported by the language standard. POSIX Threads (or Pthreads) is a POSIX standard for threads. Implementation of pthread is available with gcc compiler.

What is multithreading task?

. NET framework provides Threading. Tasks class to let you create tasks and run them asynchronously. A task is an object that represents some work that should be done. The task can tell you if the work is completed and if the operation returns a result, the task gives you the result.

How are threads executed in C?

To execute the c file, we have to use the -pthread or -lpthread in the command line while compiling the file. Syntax: int pthread_create(pthread_t * thread, const pthread_attr_t * attr, void * (*start_routine)(void *), void *arg);


3 Answers

You can use Stephen Toub's ForEachAsync extension method to process the files. It allows you to specify how many concurrent threads you want to use, and it is non-blocking so it frees up your main thread to do other processing. Here is the method from the article:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

In order to use it I refactored your code slightly. I changed the dictionary to be of type Dictionary<string, List<string>> and it basically holds the host as the key and then all the paths as the values. I assumed the file path will contain the host name in it.

   my_dictionary = (from h in hostname
                    from f in file_paths
                    where f.Contains(h)
                    select new { Hostname = h, File = f }).GroupBy(x => x.Hostname)
                    .ToDictionary(x => x.Key, x => x.Select(s => s.File).Distinct().ToList());

I also changed your process_file method to be async as you were using Task.Delay inside it, which you need to await otherwise it doesn't do anything.

public static async Task process_file(string file_path_in)
{
    var time_delay_random = new Random();
    Console.WriteLine("Started:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
    await Task.Delay(time_delay_random.Next(3000) + 1000);
    Console.WriteLine("Completed:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
}

To use the code, you get the maximum number of threads you want to use and pass that to my_files.my_dictionary.ForEachAsync. You also supply an asynchronous delegate which processes each of the files for a particular host and sequentially awaits each one to be processed.

public static async Task MainAsync()
{
    var my_files = new file_prep_obj();
    my_files.get_files();

    const int userSuppliedMaxThread = 5;
    var maxThreads = Math.Min(userSuppliedMaxThread, my_files.my_dictionary.Values.Count());
    Console.WriteLine("MaxThreads = " + maxThreads);

    foreach (var pair in my_files.my_dictionary)
    {
        foreach (var path in pair.Value)
        {
            Console.WriteLine("Key= {0}, Value={1}", pair.Key, path);   
        }            
    }

    await my_files.my_dictionary.ForEachAsync(maxThreads, async (pair) =>
    {
        foreach (var path in pair.Value)
        {
            // serially process each path for a particular host.
            await process_file(path);
        }
    });

}

static void Main(string[] args)
{
    MainAsync().Wait();
    Console.ReadKey();

}//Close static void Main(string[] args)

Ouput

MaxThreads = 5
Key= host1, Value=C:\host1_file1
Key= host1, Value=C:\host1_file2
Key= host1, Value=C:\host1_file3
Key= host2, Value=C:\host2_file1
Key= host2, Value=C:\host2_file2
Key= host3, Value=C:\host3_file1
Key= host4, Value=C:\host4_file1
Key= host4, Value=C:\host4_file2
Key= host5, Value=C:\host5_file1
Key= host6, Value=C:\host6_file1
Started:C:\host1_file1 ThreadId:10
Started:C:\host2_file1 ThreadId:12
Started:C:\host3_file1 ThreadId:13
Started:C:\host4_file1 ThreadId:11
Started:C:\host5_file1 ThreadId:10
Completed:C:\host1_file1 ThreadId:13
Completed:C:\host2_file1 ThreadId:12
Started:C:\host1_file2 ThreadId:13
Started:C:\host2_file2 ThreadId:12
Completed:C:\host2_file2 ThreadId:11
Completed:C:\host1_file2 ThreadId:13
Started:C:\host6_file1 ThreadId:11
Started:C:\host1_file3 ThreadId:13
Completed:C:\host5_file1 ThreadId:11
Completed:C:\host4_file1 ThreadId:12
Completed:C:\host3_file1 ThreadId:13
Started:C:\host4_file2 ThreadId:12
Completed:C:\host1_file3 ThreadId:11
Completed:C:\host6_file1 ThreadId:13
Completed:C:\host4_file2 ThreadId:12
like image 88
NeddySpaghetti Avatar answered Sep 30 '22 11:09

NeddySpaghetti


I would start by organizing your data structure a bit better. Having two separate arrays not only increases data duplication, but also creates implicit coupling which may not be obvious to the person looking at your code.

A class which would hold information about a single task might look something like:

public class TaskInfo
{
    private readonly string _hostName;
    public string HostName
    {
        get { return _hostName; }
    }

    private readonly ReadOnlyCollection<string> _files;
    public ReadOnlyCollection<string> Files
    {
        get { return _files; }
    }

    public TaskInfo(string host, IEnumerable<string> files)
    {
        _hostName = host;
        _files = new ReadOnlyCollection<string>(files.ToList());
    }
}

Creating a list of tasks is now much more straightforward:

var list = new List<TaskInfo>()
{
    new TaskInfo(
        host: "host1",
        files: new[] { @"c:\host1\file1.txt", @"c:\host1\file2.txt" }),

    new TaskInfo(
        host: "host2",
        files: new[] { @"c:\host2\file1.txt", @"c:\host2\file2.txt" })

    /* ... */
};

And now that you have your tasks ready, you can simply use various classes from the System.Threading.Tasks namespace to invoke them in parallel. If you really want to limit the number of concurrent tasks, you can simply use the MaxDegreeOfParallelism property:

Parallel.ForEach(
    list, 
    new ParallelOptions() { MaxDegreeOfParallelism = 10 },
    taskInfo => Process(taskInfo)
);

If you wanted to create your own pool of threads, you could have also achieved a similar thing using a ConcurrentQueue with multiple consumer threads, possibly waiting on a list of WaitHandles to know when they're done.

like image 44
Groo Avatar answered Sep 30 '22 12:09

Groo


I was playing around with your problem and came up with the folllowing approach. It might not be the best, but I believe it suits your needs.

Before we begin, I'm a big fan of extension methods, so here is one:

public static class IEnumerableExtensions
{
    public static void Each<T>(this IEnumerable<T> ie, Action<T, int> action)
    {
        var i = 0;
        foreach (var e in ie) action(e, i++);
    }
}

What this does is looping over a collection (foreach) but keeping the item and the index. You'll see why this is needed later.

Then we have the variables.

public static string[] group_file_paths =
{
    "host1", "host1", "host1", "host2", "host2", "host3", "host4", "host4",
    "host5", "host6"
};

public static string[] group_file_host_name =
{
    @"c:\\host1_file1", @"c:\\host1_file2", @"c:\\host1_file3", @"c:\\host2_file1", @"c:\\host2_file2", @"c:\\host3_file1",
    @"c:\\host4_file1", @"c:\\host4_file2", @"c:\\host5_file1", @"c:\\host5_file2", @"c:\\host6_file1" 
};

Then the main code:

public static void Main(string[] args)
{
    Dictionary<string, List<string>> filesToProcess = new Dictionary<string, List<string>>();

    // Loop over the 2 arrays and creates a directory that contains the host as the key, and then all the filenames.
    group_file_paths.Each((host, hostIndex) =>
    {
        if (filesToProcess.ContainsKey(host))       
        { filesToProcess[host].Add(group_file_host_name[hostIndex]); }
        else
        {
            filesToProcess.Add(host, new List<string>());
            filesToProcess[host].Add(group_file_host_name[hostIndex]);
        }
    });

    var tasks = new List<Task>();

    foreach (var kvp in filesToProcess)
    {
        tasks.Add(Task.Factory.StartNew(() => 
        {
            foreach (var file in kvp.Value)
            {
                process_file(kvp.Key, file);
            }
        }));
    }

    var handleTaskCompletionTask = Task.WhenAll(tasks);
    handleTaskCompletionTask.Wait();
}

Some explanation might be needed here:

So I'm creating a dictionary that will contains your hosts as the key and as the value a list of files that needs to be processed.

Your dictionary will look like:

  • Host1
    • file 1
    • file 2
  • Host 2
    • file 1
  • Host 3
    • File 1
    • File 2
    • File 3

After that I'm creating a collection of tasks that will be executed by using TPL. I execute all the tasks right now and I'm waiting for all the tasks to finish.

Your process method seems as follow, just for testing purposes:

    public static void process_file(string host, string file)
    {
        var time_delay_random = new Random();
        Console.WriteLine("Host '{0}' - Started processing the file {1}.", host, file);
        Thread.Sleep(time_delay_random.Next(3000) + 1000);
        Console.WriteLine("Host '{0}' - Completed processing the file {1}.", host, file);
        Console.WriteLine("");
    }

This post does not include a way to set the threads yourself but it can be easily achieved by using a completion handler on the tasks. Than when any task complete, you can loop again over your collection and start a new task that hasn't been finished yet.

So, I hope it helps.

like image 25
Complexity Avatar answered Sep 30 '22 10:09

Complexity