I'm still in the process of wrapping my brain around how concurrency works in Java. I understand that (if you're subscribing to the OO Java 5 concurrency model) you implement a Task
or Callable
with a run()
or call()
method (respectively), and it behooves you to parallelize as much of that implemented method as possible.
But I'm still not understanding something inherent about concurrent programming in Java:
Task
's run()
method assigned the right amount of concurrent work to be performed?As a concrete example, what if I have an I/O-bound readMobyDick()
method that reads the entire contents of Herman Melville's Moby Dick into memory from a file on the local system. And let's just say I want this readMobyDick()
method to be concurrent and handled by 3 threads, where:
Do I need to chunk Moby Dick up into three files and pass them each to their own task, or do I I just call readMobyDick()
from inside the implemented run()
method and (somehow) the Executor
knows how to break the work up amongst the threads.
I am a very visual learner, so any code examples of the right way to approach this are greatly appreciated! Thanks!
Multiple threads can also read data from the same FITS file simultaneously, as long as the file was opened independently by each thread. This relies on the operating system to correctly deal with reading the same file by multiple processes.
Multithreading also leads to minimization and more efficient use of computing resources. Application responsiveness is improved as requests from one thread do not block requests from other threads. Additionally, multithreading is less resource-intensive than running multiple processes at the same time.
You have probably chosen by accident the absolute worst example of parallel activities!
Reading in parallel from a single mechanical disk is actually slower than reading with a single thread, because you are in fact bouncing the mechanical head to different sections of the disk as each thread gets its turn to run. This is best left as a single threaded activity.
Let's take another example, which is similar to yours but can actually offer some benefit: assume I want to search for the occurrences of a certain word in a huge list of words (this list could even have come from a disk file, but like I said, read by a single thread). Assume I can use 3 threads like in your example, each searching on 1/3rd of the huge word list and keeping a local counter of how many times the searched word appeared.
In this case you'd want to partition the list in 3 parts, pass each part to a different object whose type implements Runnable and have the search implemented in the run
method.
The runtime itself has no idea how to do the partitioning or anything like that, you have to specify it yourself. There are many other partitioning strategies, each with its own strengths and weaknesses, but we can stick to the static partitioning for now.
Let's see some code:
class SearchTask implements Runnable {
private int localCounter = 0;
private int start; // start index of search
private int end;
private List<String> words;
private String token;
public SearchTask(int start, int end, List<String> words, String token) {
this.start = start;
this.end = end;
this.words = words;
this.token = token;
}
public void run() {
for(int i = start; i < end; i++) {
if(words.get(i).equals(token)) localCounter++;
}
}
public int getCounter() { return localCounter; }
}
// meanwhile in main :)
List<String> words = new ArrayList<String>();
// populate words
// let's assume you have 30000 words
// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");
// create threads for each task
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
Thread t3 = new Thread(task3);
// start threads
t1.start();
t2.start();
t3.start();
// wait for threads to finish
t1.join();
t2.join();
t3.join();
// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();
This should work nicely. Note that in practical cases you would build a more generic partitioning scheme. You could alternatively use an ExecutorService
and implement Callable
instead of Runnable
if you wish to return a result.
So an alternative example using more advanced constructs:
class SearchTask implements Callable<Integer> {
private int localCounter = 0;
private int start; // start index of search
private int end;
private List<String> words;
private String token;
public SearchTask(int start, int end, List<String> words, String token) {
this.start = start;
this.end = end;
this.words = words;
this.token = token;
}
public Integer call() {
for(int i = start; i < end; i++) {
if(words.get(i).equals(token)) localCounter++;
}
return localCounter;
}
}
// meanwhile in main :)
List<String> words = new ArrayList<String>();
// populate words
// let's assume you have 30000 words
// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));
// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);
// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
counter += f.get();
}
You picked a bad example, as Tudor was so kind to point out. Spinning disk hardware is subject to physical constraints of moving platters and heads, and the most efficient read implementation is to read each block in order, which reduces the need to move the head or wait for the disk to align.
That said, some operating systems don't always store things continuously on disks, and for those who remember, defragmentation could provide a disk performance boost if you OS / filesystem didn't do the job for you.
As you mentioned wanting a program that would benefit, let me suggest a simple one, matrix addition.
Assuming you made one thread per core, you can trivially divide any two matrices to be added into N (one for each thread) rows. Matrix addition (if you recall) works as such:
A + B = C
or
[ a11, a12, a13 ] [ b11, b12, b13] = [ (a11+b11), (a12+b12), (a13+c13) ]
[ a21, a22, a23 ] + [ b21, b22, b23] = [ (a21+b21), (a22+b22), (a23+c23) ]
[ a31, a32, a33 ] [ b31, b32, b33] = [ (a31+b31), (a32+b32), (a33+c33) ]
So to distribute this across N threads, we simply need to take the row count and modulus divide by the number of threads to get the "thread id" it will be added with.
matrix with 20 rows across 3 threads
row % 3 == 0 (for rows 0, 3, 6, 9, 12, 15, and 18)
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19)
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17)
// row 20 doesn't exist, because we number rows from 0
Now each thread "knows" which rows it should handle, and the results "per row" can be computed trivially because the results do not cross into other thread's domain of computation.
All that is needed now is a "result" data structure which tracks when the values have been computed, and when last value is set, then the computation is complete. In this "fake" example of a matrix addition result with two threads, computing the answer with two threads takes approximately half the time.
// the following assumes that threads don't get rescheduled to different cores for
// illustrative purposes only. Real Threads are scheduled across cores due to
// availability and attempts to prevent unnecessary core migration of a running thread.
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3)
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1)
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3)
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1)
More complex problems can be solved by multithreading, and different problems are solved with different techniques. I purposefully picked one of the simplest examples.
you implement a Task or Callable with a run() or call() method (respectively), and it behooves you to parallelize as much of that implemented method as possible.
A Task
represents a discrete unit of work
Loading a file into memory is a discrete unit of work and can therefore this activity can be delegated to a background thread. I.e. a background thread runs this task of loading the file.
It is a discrete unit of work since it has no other dependencies needed in order to do its job (load the file) and has discrete boundaries.
What you are asking is to further divide this into task. I.e. a thread loads 1/3 of the file while another thread the 2/3 etc.
If you were able to divide the task into further subtasks then it would not be a task in the first place by definition. So loading a file is a single task by itself.
To give you an example:
Let's say that you have a GUI and you need to present to the user data from 5 different files. To present them you need also to prepare some data structures to process the actual data.
All these are separate tasks.
E.g. the loading of files is 5 different tasks so could be done by 5 different threads.
The preparation of the data structures could be done a different thread.
The GUI runs of course in another thread.
All these can happen concurrently
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With