Java Programming Tutorials

Java programming tutorials with many code examples!

Java Fork Join example

In Java Fork Join Framework takes advantage of multiple processors to process divide and conquer algorithms with full power. In this post we’ll show how to start with Fork Join processing.

ForkJoinPool – Work-stealing ExecutorService

Fork Join Framework employs work-stealing ExecutorService in which worker threads that don’t have anything to do can steal tasks from task-queues of threads that are still busy. This allow to use all processing power to speed-up your application. The work-stealing algorithm is implemented in ForkJoinPool.

ForkJoinTask

Tasks submitted to ForkJoinPool should be of ForkJoinTask type. There are two implementations of ForkJoinTask that usually are used:

RecursiveAction

It’s a ForkJoinTask that returns no value after processing. We’ll show how to use it in the example below.

RecursiveTask

It’s a ForkJoinTask that should return a value after execution. We can use such tasks when we want to collect (join) results from all such tasks at the end of processing.

Task structure in Fork Join Framework

Tasks processed in Java Fork Join should follow the same basic structure:

  1. If the task is small enough (below some threshold) – do it.
  2. Else split the task into smaller pieces and invoke them for execution in ForkJoinPool

So it seems like a good fit for Divide and Conquer algorithms. :-)

Complete Fork Join Example

RecursiveAction with task to run in ForkJoinPool

In the following code we create our own RecursiveAction that will take a Java 8 Function and apply it to items in the given list. If selected range of items is small enough the function will be applied directly, else we create two subtasks that will do the same for half of the range:

package com.farenda.java.util.concurrent;

import java.util.List;
import java.util.concurrent.RecursiveAction;
import java.util.function.UnaryOperator;

class ApplicatorTask<T> extends RecursiveAction {

    private final UnaryOperator<T> function;
    private final int threshold;
    private final List<T> items;
    private final int from;
    private final int to;

    public ApplicatorTask(UnaryOperator<T> function,
                          int threshold,
                          List<T> items, int from, int to) {
        this.function = function;
        this.threshold = threshold;
        this.items = items;
        this.from = from;
        this.to = to;
    }

    @Override
    protected void compute() {
        if (to-from < threshold) {
            System.out.printf("Computing [%d,%d] in thread %s%n",
                    from, to, Thread.currentThread().getName());
            for (int i = from; i < to; ++i) {
                items.set(i, function.apply(items.get(i)));
            }
        } else {
            int mid = Math.floorDiv(from+to, 2);
            invokeAll(newTask(from, mid), newTask(mid+1, to));
        }
    }

    private ApplicatorTask<T> newTask(int start, int end) {
        return new ApplicatorTask<>(
                function, threshold, items, start, end);
    }
}

Note the use of invokeAll(taks1, task2). This way we submit subtasks to ForkJoinPool.

Starting RecursiveAction in common ForkJoinPool

This program creates a list of Double numbers and calculates Sinus for each of them using Java Fork Join Framework. Because we don’t create our own ForkJoinPool the program will use Common ForkJoinPool:

package com.farenda.java.util.concurrent;

import java.util.List;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

public class ForkJoinPoolExample {

    public static void main(String[] args) {
        int threshold = 10;
        List<Double> numbers = range(128);
        System.out.println("Numbers: " + numbers);

        new ApplicatorTask<>(
                Math::sin, threshold,
                numbers, 0, numbers.size())
                .invoke();

        System.out.println("Sinuses: " + numbers);
    }

    private static List<Double> range(int to) {
        return IntStream.range(1, to)
                .asDoubleStream().boxed()
                .collect(toList());
    }
}

On my two-core machine the above code produces the following output:

Numbers: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, ...]
Computing [0,7] in thread main
Computing [8,15] in thread main
Computing [16,23] in thread main
Computing [24,31] in thread main
Computing [32,39] in thread main
Computing [40,47] in thread main
Computing [48,55] in thread main
Computing [56,63] in thread main
Computing [64,71] in thread main
Computing [72,79] in thread main
Computing [80,87] in thread main
Computing [88,95] in thread main
Computing [96,103] in thread main
Computing [104,111] in thread main
Computing [112,119] in thread ForkJoinPool.commonPool-worker-1
Computing [120,127] in thread ForkJoinPool.commonPool-worker-1
Sinuses: [0.8414709848078965, 0.9092974268256817, ...]

The amount of work to do was very small, so the distribution of work isn’t astonishing, but is enough to demonstrate how processing in Fork Join Framework looks like. Also notice that the main thread takes part in processing!

Share with the World!