The **Fork/Join framework** is an high level mechanism, introduced by Java7 within the **java.util.concurrent package**, for multithreading purposes like parallel execution of a task by multiple processors. In particular, its use is ideal for simplifying the resolution of problems that can be addressed using a **divide-and-conquer** strategy, that is, of those problems that can be broken down into similar but smaller sub-problems that can be resolved individually and whose individual solutions can then be combined to produce the overall solution.

The main class of the fork/join framework is **ForkJoinPool**, an implementation of **ExecutorService** that provides an efficient pool of threads that can execute **ForkJoinTask**. Efficient because, as indicated in the API, it uses a **work-stealing** strategy that assumes a thread, when it concluded its own task, can take care (“steal”) of tasks created by another thread that is still performing computation. The fork/join framework is also one of the topic included in the OCPJP Oracle Certified Professional, Java SE Programmer II certification exam.

As mentioned, threads of the ForkJoinTask can run ForkJoinPool. This is an abstraction (abstract class) extended in turn by two abstract classes, **RecursiveAction** and **RecursiveTask**. They are used to differentiate between tasks that need to return a result at the end of their computation (RecursiveTask) and those that have just to perform operations but don’t need to return any value (RecursiveAction). The difference between the two classes, which must be kept in mind during the OCPJP exam, is the return value of the compute() method used to invoke the task execution.

The method signature for the class RecursiveTask is: **V protected abstract compute()**

Instead, for the class RecursiveAction, the signature is: **protected abstract void compute()**

In practice, what we need to do to implement a solution based on a divide-and-conquer strategy is to define an acceptable size of the problem to be solved by a single thread and, at every invocation, check whether this dimension is achieved by the process that breaks down the original problem into sub-problems. In this case, the thread can proceed with the operations of calculation of the solution of the problem, otherwise, if the size of the problem is still too large and needs to be further decomposed, we need to subdivide it again and to recursively address new tasks to other threads. At the end of the calculations carried out by the various threads on different sub-problems, we can combine the results to build the overall solution to the original problem.

To do this we use the fork() method, provided in the abstract class ForkJoinTask, to actually run the thread that will execute the task on a sub-problem. Then we will invoke in the current thread the compute() method to run the task on the other subproblem, and then we combine this result with the one provided by the task previously forked, invoking upon it the join() method, which indicates to the current thread to wait for its termination.

But let us see a concrete proof of use, that it is perhaps better than a thousand explanations.

In the following example we apply, through the use of the fork/join framework, a divide-and-conquer strategy to solve the problem of **finding the number of occurrences of an element X in an array of integers**.

One thing to remember for the OCPJP examination, before proceeding with the example, is that the ForkJoinPool can be instantiated passing to its constructor an integer representing the number of parallel processes that we want to use. If the no-argument constructor is invoked and so that value is not provided, the ForkJoinPool is created by default with a degree of parallelism equal to the number of processors available on the machine, obtainable with a call to **Runtime.getRuntime()).AvailableProcessors()**.

In the following example, we explicitly get the number of available processors through the above method, and we use that value in the creation of ForkJoinPool, even if in this case we could have just create it using the default constructor. Then we calculate the maximum size of the problem addressable by a single-threaded, dividing the total number of elements in the array by the number of available processors. At this point we’ll put in execution within the ForkJoinPool, through the invoke() method, the task representing the initial problem, with size equal to the length of the entire array. Then we’ll recursively subdivide the problem in half, down to the size resolvable by individual threads which then calculate the partial solution.

import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinFindXOccurrences { public static int numOfThreads; public static final int[] SEARCH_ARRAY = { 5, 1, 3, 1, 20, 7, 8, 4, 22, 1, 45, 21, 9, 11, 34, 10, 14, 1 }; public static final int TO_FIND = 1; public static void main(final String[] args) { // getting the number of processors available on the machine System.out.println("Number of processor: " + Runtime.getRuntime().availableProcessors()); numOfThreads = Runtime.getRuntime().availableProcessors(); System.out.println("Number of array elements: " + SEARCH_ARRAY.length); // we calculate the dimension of blocks to which we break down the problem int dim = (int) Math.ceil((double) SEARCH_ARRAY.length / numOfThreads); System.out.println("Block Dimension: " + dim); // we create a fork-join pool with the number of available processors // in this case, being the number of processors the default value // we could have just invoked the no-args constructor final ForkJoinPool pool = new ForkJoinPool(numOfThreads); // we start the computation through the pool final int n = SEARCH_ARRAY.length; final int numOfOcc = pool.invoke(new RecursiveCountOfX(0, n - 1, dim)); // we print the FINAL result System.out.printf("[" + Thread.currentThread().getName() + "] Total number of occurrences of the element " + TO_FIND + ": " + numOfOcc); } static class RecursiveCountOfX extends RecursiveTask<Integer> { int start, end, dim; // "start" e "end" are the edges of the range where to look for the element // "dim" represents the dimension of blocks to process public RecursiveCountOfX(final int start, final int end, final int dim) { this.start = start; this.end = end; this.dim = dim; } @Override public Integer compute() { // if the number of elements is lower than the addressable size we process this range if ((this.end - this.start) < this.dim) { int ris = 0; for (int i = this.start; i <= this.end; i++) { if (SEARCH_ARRAY[i] == TO_FIND) { ris++; } } // we show the partial result of the sub-problem and the name of the thread that has calculated it System.out.println("\t [" + Thread.currentThread().getName() + "]: " + ris + " occurrences of " + TO_FIND + " within the range from " + this.start + " to " + this.end); return ris; } // otherwise, if the range is still too big, we divide it again in half final int mid = (this.start + this.end) / 2; System.out.printf("Fork the computation in two ranges: " + "from %d to %d and from %d to %d %n", this.start, mid, mid + 1, this.end); // we assign the calculation of the first half to a task final RecursiveCountOfX subTask1 = new RecursiveCountOfX(this.start, mid, this.dim); // and we invoke it, forking a new process subTask1.fork(); // we assign the calculation of the second half to another task final RecursiveCountOfX subTask2 = new RecursiveCountOfX(mid + 1, this.end, this.dim); // and we calculate it final int resultSecond = subTask2.compute(); // we wait for termination of the forked thread and we put results together return subTask1.join() + resultSecond; } } }

The code of the program is extensively commented, in order to describe in detail every step of its implementation.

The output generated is listed below. It shows the range in which the problem has been split, the thread that has taken over the computation of each range, the partial results calculated on the sub-problems and the final solution obtained by aggregating the solutions of the subproblems.

Number of processor: 4 Number of array elements: 18 Block Dimension: 5 Fork the computation in two ranges: from 0 to 8 and from 9 to 17 Fork the computation in two ranges: from 9 to 13 and from 14 to 17 Fork the computation in two ranges: from 0 to 4 and from 5 to 8 [ForkJoinPool-1-worker-3]: 1 occurrences of 1 within the range from 9 to 13 [ForkJoinPool-1-worker-1]: 1 occurrences of 1 within the range from 14 to 17 [ForkJoinPool-1-worker-3]: 2 occurrences of 1 within the range from 0 to 4 [ForkJoinPool-1-worker-2]: 0 occurrences of 1 within the range from 5 to 8 [main] Total number of occurrences of the element 1: 4

Ok, we made our first program using the fork/join framework.

Now let’s add some more consideration, some directly connected with examination OCPJP, others less.

We first note that the 4 subtasks in which the original problem was divided, have actually been processed by 3 different threads, or ** workers** as they are called by ForkJoinPool, with the worker-3 that has processed 2 of them.

This is because, since the computational load required to complete the task was almost nothing, the worker finished quickly and it was able to take charge of another sub-problem, avoinding that for this purpose were allocated a new worker. But to verify that the number of workers allocated corresponds to those specified when creating the ForkJoinPool, then 4 in our case, we introduce a sleep operation to increment the resolution time spent for a task by a single worker.

So, we modify the code, adding a call ** Thread.sleep(1000)** before returning the result of the computation of the workers, that introduces a wait of one second (the parameter of sleep is expressed in milliseconds) before termination of the task.

import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinFindXOccurrences { public static int numOfThreads; public static final int[] SEARCH_ARRAY = { 5, 1, 3, 1, 20, 7, 8, 4, 22, 1, 45, 21, 9, 11, 34, 10, 14, 1 }; public static final int TO_FIND = 1; public static void main(final String[] args) { // getting the number of processors available on the machine System.out.println("Number of processor: " + Runtime.getRuntime().availableProcessors()); numOfThreads = Runtime.getRuntime().availableProcessors(); System.out.println("Number of array elements: " + SEARCH_ARRAY.length); // we calculate the dimension of blocks to which we break down the problem int dim = (int) Math.ceil((double) SEARCH_ARRAY.length / numOfThreads); System.out.println("Block Dimension: " + dim); // we create a fork-join pool with the number of available processors // in this case, being the number of processors the default value // we could have just invoked the no-args constructor final ForkJoinPool pool = new ForkJoinPool(numOfThreads); // we start the computation through the pool final int n = SEARCH_ARRAY.length; final int numOfOcc = pool.invoke(new RecursiveCountOfX(0, n - 1, dim)); // we print the FINAL result System.out.printf("[" + Thread.currentThread().getName() + "] Total number of occurrences of the element " + TO_FIND + ": " + numOfOcc); } static class RecursiveCountOfX extends RecursiveTask<Integer> { int start, end, dim; // "start" e "end" are the edges of the range where to look for the element // "dim" represents the dimension of blocks to process public RecursiveCountOfX(final int start, final int end, final int dim) { this.start = start; this.end = end; this.dim = dim; } @Override public Integer compute() { // if the number of elements is lower than the addressable size we process this range if ((this.end - this.start) < this.dim) { int ris = 0; for (int i = this.start; i <= this.end; i++) { if (SEARCH_ARRAY[i] == TO_FIND) { ris++; } } we show the partial result of the sub-problem and the name of the thread that has calculated it System.out.println("\t [" + Thread.currentThread().getName() + "]: " + ris + " occurrences of " + TO_FIND + " within the range from " + this.start + " to " + this.end); try { // we simulate a bigger effort for the resolution of the task Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return ris; } // otherwise, if the range is still too big, we divide it again in half final int mid = (this.start + this.end) / 2; System.out.printf("Fork the computation in two ranges: " + "from %d to %d and from %d to %d %n", this.start, mid, mid + 1, this.end); // we assign the calculation of the first half to a task final RecursiveCountOfX subTask1 = new RecursiveCountOfX(this.start, mid, this.dim); // and we invoke it, forking a new process subTask1.fork(); // we assign the calculation of the second half to another task final RecursiveCountOfX subTask2 = new RecursiveCountOfX(mid + 1, this.end, this.dim); // and we calculate it final int resultSecond = subTask2.compute(); // we wait for termination of the forked thread and we put results together return subTask1.join() + resultSecond; } } }

Here we see the output of the new run where we can verify the use of four different workers for the resolution of the 4 sub-problems in which we divided the original problem. This time also the worker-4, that was not required in the previous execution, has been used.

Number of processor: 4 Number of array elements: 18 Block Dimension: 5 Fork the computation in two ranges: from 0 to 8 and from 9 to 17 Fork the computation in two ranges: from 9 to 13 and from 14 to 17 Fork the computation in two ranges: from 0 to 4 and from 5 to 8 [ForkJoinPool-1-worker-3]: 1 occurrences of 1 within the range from 9 to 13 [ForkJoinPool-1-worker-1]: 1 occurrences of 1 within the range from 14 to 17 [ForkJoinPool-1-worker-2]: 0 occurrences of 1 within the range from 5 to 8 [ForkJoinPool-1-worker-4]: 2 occurrences of 1 within the range from 0 to 4 [main] Total number of occurrences of the element 1: 4

Another thing that is good to analyze and to take care about during OCPJP exam, is the order of calls to the fundamental methods of the fork/join framework.

Let’s see why.

We start analyzing the following lines of code, where we assemble the partial results of the two sub-problems solved:

final int resultSecond = subTask2.compute(); // we wait for termination of the forked thread and we put results together return subTask1.join() + resultSecond;

In this case, it would not be necessary to assign to a dedicated variable the result of computation of subTask2, but we could directly use the value given by the return. And here is where things can became dangerous, because we ** must assure that the invocation of compute() method on subTask2 is done before the join() call on subTask1 **.

So you have to pay attention to the order of method invocations is as follows:

// First we start the computation and then we can call join on the other thread return subTask2.compute() + subTask1.join();

For simplicity we don’t report the entire code. The result of the execution of the program with the results assembled with the line of code listed above is in line with that previously obtained (obviously, we just avoid to use the variable):

Number of processor: 4 Number of array elements: 18 Block Dimension: 5 Fork the computation in two ranges: from 0 to 8 and from 9 to 17 Fork the computation in two ranges: from 9 to 13 and from 14 to 17 Fork the computation in two ranges: from 0 to 4 and from 5 to 8 [ForkJoinPool-1-worker-3]: 1 occurrences of 1 within the range from 9 to 13 [ForkJoinPool-1-worker-1]: 1 occurrences of 1 within the range from 14 to 17 [ForkJoinPool-1-worker-2]: 0 occurrences of 1 within the range from 5 to 8 [ForkJoinPool-1-worker-4]: 2 occurrences of 1 within the range from 0 to 4 [main] Total number of occurrences of the element 1: 4

But what if we reverse the order of the compute() and join() calls? It happens that we wait for the termination of the other thread WITHOUT starting in parallel the calculation in the current thread. So basically, ** we are negating the effects of the parallel processing and we are just proceeding in a sequential manner, with a single thread **.

We can easily verify that. Let’s replace the line of code with the return statement, reversing calls to compute() and to join() in this way:

return subTask1.join() + subTask2.compute();

Running again our the result obtained is the following:

Number of processor: 4 Number of array elements: 18 Block Dimension: 5 Fork the computation in two ranges: from 0 to 8 and from 9 to 17 Fork the computation in two ranges: from 0 to 4 and from 5 to 8 [ForkJoinPool-1-worker-1]: 2 occurrences of 1 within the range from 0 to 4 [ForkJoinPool-1-worker-1]: 0 occurrences of 1 within the range from 5 to 8 Fork the computation in two ranges: fomr 9 to 13 and from 14 to 17 [ForkJoinPool-1-worker-1]: 1 occurrences of 1 within the range from 9 to 13 [ForkJoinPool-1-worker-1]: 1 occurrences of 1 within the range from 14 to 17 [main] Total number of occurrences of the element 1: 4

From the output generated we seee that ** ALL subproblems have been calculated by the same worker! ** So our program didn’t run concurrently but in single-threaded mode.

We notice, in the same way, that the final result does not change. We do not get deadlock or other problems related to concurrency and we don’t get an incorrect or unpredictable result. What we get is the same correct result but calculated with worst performance, due to the fact of not exploiting multithreading.

So, once again, it is important to pay attention to this potential confusion and to know what is the behavior of the program in this case.

One more thing that is worth spending still two words, although not closely linked to the fork/join framework, is the method we used to calculate the acceptable size of the task.

The line of code:

// we calculate the dimension of blocks to which we break down the problem int dim = (int) Math.ceil((double) SEARCH_ARRAY.length / numOfThreads);

rounds the division to the upper integer value. This helps us to define a size of the sub-problems that could be handled by the number of threads available. In this case, with 4 threads and 18 elements to be analyzed, we need to create intervals of 5 elements.

If we had simply used the whole division, we would have gotten blocks of 4 elements size and it would not be possible to manage them with only four tasks. In this case we would have gotten more subdivisions in sub-problems and new tasks created and invoked.

Again we can verify that, eliminating the call to the static method ** Math.ceil() ** in the calculation of the size of the sub-problems, which then becomes:

int dim = SEARCH_ARRAY.length / numOfThreads;

Running the program we get the following output:

Number of processor: 4 Number of array elements: 18 Block Dimension: 4 Fork the computation in two ranges: from 0 to 8 and from 9 to 17 Fork the computation in two ranges: from 9 to 13 and from 14 to 17 Fork the computation in two ranges: from 0 to 4 and from 5 to 8 Fork the computation in two ranges: from 9 to 11 and from 12 to 13 [ForkJoinPool-1-worker-1]: 1 occurrences of 1 within the range from 14 to 17 [ForkJoinPool-1-worker-3]: 0 occurrences of 1 within the range from 12 to 13 Fork the computation in two ranges: from 0 to 2 and from 3 to 4 [ForkJoinPool-1-worker-4]: 1 occurrences of 1 within the range from 3 to 4 [ForkJoinPool-1-worker-4]: 1 occurrences of 1 within the range from 0 to 2 [ForkJoinPool-1-worker-2]: 0 occurrences of 1 within the range from 5 to 8 [ForkJoinPool-1-worker-1]: 1 occurrences of 1 within the range from to 11 [main] Total number of occurrences of the element 1: 4

and we can notice that:

– the line la riga “Block Dimension” now has value 4 instead that 5

– the number of sub-problems in which the original problema has been divided is 6 and not 4 as before

– worker-4 and worker-1 solve 2 tasks each

Pingback: Java Fork/Join framework performances: when is it worth? | Dede Blog