Preconditions (generic description):
1. static class field
static List<String> ids = new ArrayList<>();
2. CompletableFuture#runAsync(Runnable runnable,Executor executor)
called within
static void main(String args[])
method
3. elements added to someCollection
inside of runAsync
call from step2
Code snippet (specific description):
private static List<String> ids = new ArrayList<>();
public static void main(String[] args) throws ExecutionException, InterruptedException {
//...
final List<String> lines = Files.lines(path).collect(Collectors.toList());
for (List<String> lines : CollectionUtils.split(1024, lines)) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
List<User> users = buildUsers();
populate(users);
}, executorService);
futures.add(future);
}
private static void populate(List<User> users){
//...
ids.add(User.getId);
//...
}
}
Problem description:
As I understand from concurrency point of view, static variable could NOT be shared between threads, so data can be lost some way.
Should it be changed to volatile
or it would be reasonable to use
ConcurrentSkipListSet<String>
?
CompletableFuture allows us to write non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its Progress, Completion or Failure. CompletableFuture is inspired from ListenableFuture in Guava and Are similar to Promise in java scripts.
The difference between runAsync() and supplyAsync() is that the former returns a Void while supplyAsync() returns a value obtained by the Supplier. Both methods also support a second input argument — a custom Executor to submit tasks to.
Yes! CompletableFuture executes these tasks in a thread obtained from the global ForkJoinPool. commonPool(). But hey, you can also create a Thread Pool and pass it to runAsync() and supplyAsync() methods to let them execute their tasks in a thread obtained from your thread pool.
What is CompletableFuture? A CompltableFuture is used for asynchronous programming. Asynchronous programming means writing non-blocking code. It runs a task on a separate thread than the main application thread and notifies the main thread about its progress, completion or failure.
Based on the code snippet:
volatile
is not required here because it works on reference level, while the tasks don't update the reference of the collection object, they mutate its state. Would the reference be updated, either volatile
or AtomicReference
might have been used.
Static object can be shared between threads, but the object must be thread-safe. A concurrent collection will do the job for light to medium load.
But the modern way to do this would involve streams instead of using a shared collection:
List<CompletableFuture<List<String>>> futures = lines.stream()
.map(line -> CompletableFuture.supplyAsync(() -> buildUsers().stream()
.map(User::getId)
.collect(Collectors.toList()),
executorService))
.collect(Collectors.toList());
ids.addAll(futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
In your particular case there are ways to guarantee thread safety for ids:
Examples of synchronized:
private static synchronized void populate(List<User> users){
//...
ids.add(User.getId);
//...
}
private static void populate(List<User> users){
//...
synchronized (ids) {
ids.add(User.getId);
}
//...
}
I assume that it would be the fastest to use Collections.newSetFromMap(new ConcurrentHashMap(), if you expect a lot of user ids. Otherwise, you would be familiar with ConcurrentSkipListSet.
volatile is a bad option here. Volatile guarantees visibility, but not atomicity. The typical examples of volatile usage are
volatile a = 1
void threadOne() {
if (a == 1) {
// do something
}
}
void threadTwo() {
// do something
a = 2
}
In that case, you do only write/read operations once. As "a" is volatile, then it is guaranteed that each thread "see" (read) full exactly 1 or 2. Another (bad example):
void threadOne() {
if (a == 1) {
// do something
a++;
}
}
void threadTwo() {
if (a == 1) {
// do something
a = 2
} else if (a == 2) {
a++
}
}
Here we do increment operation (read and write) and there are could be different results of a, because we don't have atomicity. That's why there are AtomicInteger, AtomicLong, etc. In your case, all threads would see the write value ids, but they would write different values and if you see inside "add" method of ArrayList, you will see something like:
elementData[size++] = e;
So nobody guarantees atomicity of size value, you could write different id in one array cell.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With