The problem is that the rather limited parallelism you have configured is eaten up by the outer stream processing: if you say that you want eight threads and process a stream of more than eight items with parallel()
it will create eight worker threads and let them process items.
Then within your consumer you are processing another stream using parallel()
but there are no worker threads left. Since the worker threads are blocked waiting for the end of the of the inner stream processing, the ForkJoinPool
has to create new worker threads which violate your configured parallelism. It seems to me that it does not recycle these extend threads but let them die right after processing. So within your inner processing, new threads are created and disposed which is an expensive operation.
You might see it as a flaw that the initiating threads do not contribute to the computation of a parallel stream processing but just wait for the result but even if that was fixed you still have a general problem that is hard (if ever) to fix:
Whenever the ratio between the number of worker threads to outer stream items is low, the implementation will use them all for the outer stream as it doesn’t know that the stream is an outer stream. So executing an inner stream in parallel requests more worker threads than available. Using the caller thread for contributing to the computation could fix it in a way that the performance equals the serial computation but getting an advantage of parallel execution here does not work well with the concept of a fixed number of worker threads.
Note that you are scratching on the surface of this problem here, as you have rather balanced processing times for the items. If the processing of both, inner items and outer items, diverge (compared to items on the same level), the problem will be even worse.
Update: by profiling and looking at the code it seems that the ForkJoinPool
does attempts to use the waiting thread for “work stealing” but using different code depending on the fact whether the Thread
is a worker thread or some other thread. As a result, a worker thread is actually waiting about 80% of the time and doing very little to no work while other threads really contribute to the computation…
Update 2: for completeness, here the simple parallel execution approach as described in the comments. Since it enqueues every item it is expected to have to much overhead when the execution time for a single item is rather small. So it’s not a sophisticated solution but rather a demonstration that it is possible to handle long running tasks without much magic…
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class NestedParallelForEachTest1 {
static final boolean isInnerStreamParallel = true;
// Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
static final int numberOfTasksInOuterLoop = 24; // In real applications this can be a large number (e.g. > 1000).
static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
static final int concurrentExecutionsLimitForStreams = 8;
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
new NestedParallelForEachTest1().testNestedLoops();
E.shutdown();
}
final static ThreadPoolExecutor E = new ThreadPoolExecutor(
concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );
public static void parallelForEach(IntStream s, IntConsumer c) {
s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
.forEach(NestedParallelForEachTest1::waitOrHelp);
}
static void waitOrHelp(Future f) {
while(!f.isDone()) {
Runnable r=E.getQueue().poll();
if(r!=null) r.run();
}
try { f.get(); }
catch(InterruptedException ex) { throw new RuntimeException(ex); }
catch(ExecutionException eex) {
Throwable t=eex.getCause();
if(t instanceof RuntimeException) throw (RuntimeException)t;
if(t instanceof Error) throw (Error)t;
throw new UndeclaredThrowableException(t);
}
}
public void testNestedLoops(NestedParallelForEachTest1 this) {
long start = System.nanoTime();
// Outer loop
parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
if(i < 10) sleep(10 * 1000);
if(isInnerStreamParallel) {
// Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
}
else {
// Inner loop as sequential
IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
}
if(i >= 10) sleep(10 * 1000);
});
long end = System.nanoTime();
System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
}
static void sleep(int milli) {
try {
Thread.sleep(milli);
} catch (InterruptedException ex) {
throw new AssertionError(ex);
}
}
}