Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 parallelStream for concurrent Database / REST call

Here I am using Javaparallel stream to iterate through a List and calling a REST call with each list element as input. I need to add all the results of the REST call to a collection for which I am using an ArrayList. The code given below is working fine except that the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

Can someone please suggest me a proper way of using parallel stream for my case.

public void myMethod() {
    List<List<String>> partitions = getInputData();
    final List<String> allResult = new ArrayList<String>();
    partitions.parallelStream().forEach(serverList -> callRestAPI(serverList, allResult);
}

private void callRestAPI(List<String> serverList, List<String> allResult) {
    List<String> result = //Do a REST call.
    allResult.addAll(result);
}
like image 462
Zeeshan Avatar asked Jun 10 '15 13:06

Zeeshan


People also ask

Can we perform stream operations in Java 8 with a thread pool?

We can actually pass a custom ThreadPool when processing the stream. We used the ForkJoinPool constructor with a parallelism level of 4.

What is the disadvantage of parallel stream in Java 8?

Parallel Streams can actually slow you down It breaks them into subproblems which then run on separate threads for processing, these can go to different cores and then get combined when they're done. This all happens under the hood using the fork/join framework.

Is Java 8 stream parallel?

Java Parallel Streams is a feature of Java 8 and higher, meant for utilizing multiple cores of the processor. Normally any java code has one stream of processing, where it is executed sequentially.

Can Java streams be parallelized easily?

Any stream in Java can easily be transformed from sequential to parallel. We can achieve this by adding the parallel method to a sequential stream or by creating a stream using the parallelStream method of a collection: List<Integer> listOfNumbers = Arrays.


1 Answers

You can do the operation with map instead of forEach - that will guarantee thread safety (and is cleaner from a functional programming perspective):

List<String> allResult = partitions.parallelStream()
          .map(this::callRestAPI)
          .flatMap(List::stream) //flattens the lists
          .collect(toList());

And your callRestAPI method:

private List<String> callRestAPI(List<String> serverList) {
    List<String> result = //Do a REST call.
    return result;
}
like image 100
assylias Avatar answered Sep 18 '22 01:09

assylias