Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Static Collection update inside CompletableFuture#runAsync

Preconditions (generic description):

1. static class field

static List<String> ids = new ArrayList<>();

2. CompletableFuture#runAsync(Runnable runnable,Executor executor)

called within static void main(String args[]) method

3. elements added to someCollection inside of runAsync call from step2

Code snippet (specific description):

private static List<String> ids = new ArrayList<>();

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //...
    final List<String> lines = Files.lines(path).collect(Collectors.toList());
    for (List<String> lines : CollectionUtils.split(1024, lines)) {
         CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
             List<User> users = buildUsers();
             populate(users);
         }, executorService);

        futures.add(future);
    }

    private static void populate(List<User> users){
       //...
       ids.add(User.getId);
       //...
    }
}

Problem description:

As I understand from concurrency point of view, static variable could NOT be shared between threads, so data can be lost some way.

Should it be changed to volatile or it would be reasonable to use ConcurrentSkipListSet<String> ?

like image 270
sergionni Avatar asked Jan 03 '18 10:01

sergionni


People also ask

Why CompletableFuture is non-blocking?

CompletableFuture allows us to write non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its Progress, Completion or Failure. CompletableFuture is inspired from ListenableFuture in Guava and Are similar to Promise in java scripts.

What is difference between supplyAsync and runAsync?

The difference between runAsync() and supplyAsync() is that the former returns a Void while supplyAsync() returns a value obtained by the Supplier. Both methods also support a second input argument — a custom Executor to submit tasks to.

Does CompletableFuture use ForkJoinPool?

Yes! CompletableFuture executes these tasks in a thread obtained from the global ForkJoinPool. commonPool(). But hey, you can also create a Thread Pool and pass it to runAsync() and supplyAsync() methods to let them execute their tasks in a thread obtained from your thread pool.

Is CompletableFuture asynchronous?

What is CompletableFuture? A CompltableFuture is used for asynchronous programming. Asynchronous programming means writing non-blocking code. It runs a task on a separate thread than the main application thread and notifies the main thread about its progress, completion or failure.


2 Answers

Based on the code snippet:

  • volatile is not required here because it works on reference level, while the tasks don't update the reference of the collection object, they mutate its state. Would the reference be updated, either volatile or AtomicReference might have been used.

  • Static object can be shared between threads, but the object must be thread-safe. A concurrent collection will do the job for light to medium load.

But the modern way to do this would involve streams instead of using a shared collection:

List<CompletableFuture<List<String>>> futures = lines.stream()
        .map(line -> CompletableFuture.supplyAsync(() -> buildUsers().stream()
                                                                     .map(User::getId)
                                                                     .collect(Collectors.toList()),
             executorService))
        .collect(Collectors.toList());

ids.addAll(futures.stream()
                  .map(CompletableFuture::join)
                  .flatMap(List::stream)
                  .collect(Collectors.toList()));
like image 104
jihor Avatar answered Nov 12 '22 10:11

jihor


In your particular case there are ways to guarantee thread safety for ids:

  1. Use thread-safe collection (for example, ConcurrentSkipListSet, CopyOnWriteArrayList, Collections.synchronizedList(new ArrayList(), Collections.newSetFromMap(new ConcurrentHashMap());
  2. Use synchronization as shown below.

Examples of synchronized:

private static synchronized void populate(List<User> users){
  //...
  ids.add(User.getId);
  //...
}

private static void populate(List<User> users){
  //...
  synchronized (ids) {
      ids.add(User.getId);
  }
  //...
}

I assume that it would be the fastest to use Collections.newSetFromMap(new ConcurrentHashMap(), if you expect a lot of user ids. Otherwise, you would be familiar with ConcurrentSkipListSet.

volatile is a bad option here. Volatile guarantees visibility, but not atomicity. The typical examples of volatile usage are

 volatile a = 1

 void threadOne() {
      if (a == 1) {
           // do something
      }
 }

 void threadTwo() {
      // do something 
      a = 2
 }

In that case, you do only write/read operations once. As "a" is volatile, then it is guaranteed that each thread "see" (read) full exactly 1 or 2. Another (bad example):

 void threadOne() {
      if (a == 1) {
           // do something
           a++;
      }
 }

 void threadTwo() {
      if (a == 1) {
           // do something
           a = 2
      } else if (a == 2) {
           a++
      }
 }

Here we do increment operation (read and write) and there are could be different results of a, because we don't have atomicity. That's why there are AtomicInteger, AtomicLong, etc. In your case, all threads would see the write value ids, but they would write different values and if you see inside "add" method of ArrayList, you will see something like:

elementData[size++] = e;

So nobody guarantees atomicity of size value, you could write different id in one array cell.

like image 3
egorlitvinenko Avatar answered Nov 12 '22 10:11

egorlitvinenko