Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Parallel Streams close thread

I have written a method using Java streams which simply iterates over list of objects and returns true/false is certain condition is satisfied

Java Method:

 boolean method(SampleObj sampleObj) {

   List testList = invokeSomeMethod();
   int result = testList
            .parallelStream()
            .filter(listObj -> (listObj.getAttr() = 1))
            .count(listObj -> listObj.isAttr4());

   return (result > 10);

 }

I have written a Mock test case for the same as well. When I execute the test case, the test succeeds, however I get project custom error stating that all threads created were not shutdown.

I even tried using stream with try-with-resources with that noo did not help.

Mock Test:

@Test
public void testSomeMethod() {
    SampleObj sampleObj1 = new SampleObj(10, 20, 30, true);
    SampleObj sampleObj2 = new SampleObj(10, 20, 30, true);
    SampleObj sampleObj3 = new SampleObj(10, 20, 30, false);
    SampleObj sampleObjTest = new SampleObj(10, 20, 30, true);

    List<SampleObj> testList = new ArrayList<SampleObj>();
    testList.add(sampleObj1);
    testList.add(sampleObj2);
    testList.add(sampleObj3);

    when(mockedAttribute.invokeSomeMethod()).thenReturn(nodeList);

    ClassToBeTested classTest = createGenericMockRules();
    Assert.assertTrue(classTest.method(sampleObjTest));
}

P.S. I have debugged to confirm that when invokeSomeMethod() is called, my mocked testList is returned.

As far as I know, Java streams internally closes the threads it creates. Am I implementing this incorrectly?

like image 478
alwaysAStudent Avatar asked Feb 28 '17 23:02

alwaysAStudent


People also ask

Does parallel stream use thread?

A Java Parallel stream uses the fork-join framework and its common pool of worker threads to perform parallel executions. This framework was introduced in java.

How do you control threads in a parallel stream?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads. Parallel streams create ForkJoinPool instance via static ForkJoinPool. commonPool() method.

What is the disadvantage of parallel stream in Java 8?

Parallel Streams can actually slow you down It breaks them into subproblems which then run on separate threads for processing, these can go to different cores and then get combined when they're done. This all happens under the hood using the fork/join framework.

Are Java streams multithreaded?

Java 8 introduced the concept of Streams as an efficient way of carrying out bulk operations on data. And parallel Streams can be obtained in environments that support concurrency. These streams can come with improved performance – at the cost of multi-threading overhead.


1 Answers

Java streams do not create threads and thus, do not dispose threads. They use a thread pool internally; it’s unspecified but well known that it is the common pool of the Fork/Join framework.

The whole intention of using thread pools is to allow the pool to manage the threads, instead of creating and disposing threads for each job. Creating and destroying threads is associated with costs, which should be avoided when multiple jobs are subsequently enqueued. Especially the creation time of the thread adds to the job’s execution time if there is no existing thread to pick it up. In other words, it is normal and intended that the threads live longer than the jobs. They are waiting for new jobs that could arrive.

The class documentation of ForkJoinPool states:

A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool. Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).

It doesn’t specify the time a thread has to be idle before it gets reclaimed, besides “slowly”, so it may even vary from implementation to implementation. For the current implementation, it isn’t even possible to say it in terms of a timeout, as the pool will shrink the number of threads instead of terminating all idle threads after a timeout, so the remaining threads will wait again, with an increased timeout, until the pool will shrink the size again, until no idle thread is left. In other words, the more threads the pool has, the longer does it take until the last thread is reclaimed, when all threads are idle.

You can force your tests to wait for the end of all threads via

while(ForkJoinPool.commonPool().getPoolSize()>0)
    LockSupport.parkNanos(1000);

but this can increase the execution time of your tests significantly, e.g. talking of the magnitude of a minute with eight cores/threads. The better solution would be to rethink your “project custom error” checking that simply shouldn’t consider your code to be responsible for threads created by internally used pools.

Otherwise, you could get similar errors when using, e.g. asynchronous I/O, etc.

like image 54
Holger Avatar answered Oct 17 '22 14:10

Holger