Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can a non-thread-safe value be safely ported across thread boundaries using fork/join?

I have some class which is not thread safe:

class ThreadUnsafeClass {
  long i;

  long incrementAndGet() { return ++i; }
}

(I've used a long as the field here, but we should think of its field as being some thread-unsafe type).

I now have a class which looks like this

class Foo {
  final ThreadUnsafeClass c;

  Foo(ThreadUnsafeClass c) {
    this.c = c;
  }
}

That is, the thread unsafe class is a final field of it. Now I'm going to do this:

public class JavaMM {
  public static void main(String[] args) {
    final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
      ThreadUnsafeClass t = new ThreadUnsafeClass();
      t.incrementAndGet();
      return new FC(t);
    });

    assert (work.fork().join().c.i == 1); 
  }
}

That is, from thread T (main), I invoke some work on T' (the fork-join-pool) which creates and mutates an instance of my unsafe class and then returns the result wrapped in a Foo. Please note that all mutation of my thread unsafe class happens on a single thread, T'.

Question 1: Am I guaranteed that the end-state of the instance of the thread-unsafe-class is safely ported across the T' ~> T thread boundary at the join?

Question 2: What if I had done this using parallel streams? For example:

Map<Long, Foo> results = 
  Stream
    .of(new ThreadUnsafeClass())
    .parallel()
    .map(tuc -> {
      tuc.incrementAndGet();
      return new Foo(tuc);
    })
    .collect(
      Collectors.toConcurrentMap(
        foo -> foo.c.i,
        Function.identity();
      )
    );
assert(results.get(1) != null)
like image 632
oxbow_lakes Avatar asked Jan 11 '18 11:01

oxbow_lakes


People also ask

When we should use fork and join?

The fork/join framework was designed to speed up the execution of tasks that can be divided into other smaller subtasks, executing them in parallel and then combining their results to get a single one.

How is fork join different than executor service?

Fork Join is an implementation of ExecuterService. The main difference is that this implementation creates a DEQUE worker pool. Executor service creates asked number of thread, and apply a blocking queue to store all the remaining waiting task.

What is a fork join thread pool?

ForkJoinPool It is an implementation of the ExecutorService that manages worker threads and provides us with tools to get information about the thread pool state and performance. Worker threads can execute only one task at a time, but the ForkJoinPool doesn't create a separate thread for every single subtask.

What is the significance of using fork join framework in thread concurrency in Java?

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.


1 Answers

I think ForkJoinTask.join() has the same memory effects as Future.get() (because it says in join() Javadoc that is is basically get() with interruption and exception differences). And Future.get() is specified as:

Actions taken by the asynchronous computation represented by a Future happen-before actions subsequent to the retrieval of the result via Future.get() in another thread.

In other words, this is basically a "safe publication" via Future/FJT. Which means, anything that the executor thread did and published via FJT result is visible to FJT.join() users. Since the example allocates the object and populates its field only within the executor thread, and nothing happens with the object after it gets returned from the executor, it stands to reason that we are only allowed to see the values the executor thread produced.

Note that putting the whole thing via final does not bring any additional benefit to it. Even if you just did the plain field stores, you would still be guaranteed this:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    Future<MyObject> f = s.submit(() -> new MyObject(42));
    assert (f.get().x == 42); // guaranteed!
    s.shutdown();
}

public class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

But notice that in the Stream example (if we assume the symmetry between Stream.of.parallel and Executor.submit, and between Stream.collect and FJT.join/Future.get), you have created the object in the caller thread, then passed it to executor to do something. This is a subtle difference, but it does not matter much still, because we also have HB on submit, that preclude seeing the old state of the object:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    MyObject o = new MyObject(42);
    Future<?> f = s.submit(() -> o.x++); // new --hb--> submit
    f.get(); // get -->hb--> read o.x
    assert (o.x == 43); // guaranteed
    s.shutdown();
}

public static class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

(In formal speak, that is because all the HB paths from read(o.x) go via the action of the executor thread that does store(o.x, 43))

like image 179
Aleksey Shipilev Avatar answered Sep 30 '22 01:09

Aleksey Shipilev