# A brief overview of the Fork/Join Framework in Java

### Introduction

The Fork/Join framework is a framework to solve a problem using a concurrent divide-and-conquer approach. They were introduced to complement the existing concurrency API. Before their introduction, the existing ExecutorService implementations were the popular choice to run asynchronous tasks, but they work best when the tasks are homogenous and independent. Running dependent tasks and combining their results using those implementations were not easy. With the introduction of the Fork/Join framework, an attempt was made to address this shortcoming. In this post, we will take a brief look at the API and solve a couple of simple problems to understand how they work.

### Solving a non-blocking task

Let’s jump directly into code. Let’s create a task which would return the sum of all elements of a List. The following steps represent our algorithm in pseudo-code:

01. Find the middle index of the list

02. Divide the list in the middle

03. Recursively create a new task which will compute the sum of the left part

04. Recursively create a new task which will compute the sum of the right part

05. Add the result of the left sum, the middle element, and the right sum

Here is the code –

 `01` `@Slf4j`
 `02` `public` `class` `ListSummer ``extends` `RecursiveTask {`
 `03` `  ``private` `final` `List listToSum;`
 `04`
 `05` `  ``ListSummer(List listToSum) {`
 `06` `    ``this``.listToSum = listToSum;`
 `07` `  ``}`
 `08`
 `09` `  ``@Override`
 `10` `  ``protected` `Integer compute() {`
 `11` `    ``if` `(listToSum.isEmpty()) {`
 `12` `      ``log.info(``"Found empty list, sum is 0"``);`
 `13` `      ``return` `0``;`
 `14` `    ``}`
 `15`
 `16` `    ``int` `middleIndex = listToSum.size() / ``2``;`
 `17` `    ``log.info(``"List {}, middle Index: {}"``, listToSum, middleIndex);`
 `18`
 `19` `    ``List leftSublist = listToSum.subList(``0``, middleIndex);`
 `20` `    ``List rightSublist = listToSum.subList(middleIndex + ``1``, listToSum.size());`
 `21`
 `22` `    ``ListSummer leftSummer = ``new` `ListSummer(leftSublist);`
 `23` `    ``ListSummer rightSummer = ``new` `ListSummer(rightSublist);`
 `24`
 `25` `    ``leftSummer.fork();`
 `26` `    ``rightSummer.fork();`
 `27`
 `28` `    ``Integer leftSum = leftSummer.join();`
 `29` `    ``Integer rightSum = rightSummer.join();`
 `30` `    ``int` `total = leftSum + listToSum.get(middleIndex) + rightSum;`
 `31` `    ``log.info(``"Left sum is {}, right sum is {}, total is {}"``, leftSum, rightSum, total);`
 `32`
 `33` `    ``return` `total;`
 `34` `  ``}`
 `35` `}`

Firstly, we extend the RecursiveTask subtype of the ForkJoinTask. This is the type to extend from when we expect our concurrent task to return a result. When a task does not return a result but only perform an effect, we extend the RecursiveAction subtype. For most of the practical tasks that we solve, these two subtypes are sufficient.

Secondly, both RecursiveTask and RecursiveAction define an abstract compute method. This is where we put our computation.

Thirdly, inside our compute method, we check the size of the list that is passed through the constructor. If it is empty, we already know the result of the sum which is zero, and we return immediately. Otherwise, we divide our lists into two sublists and create two instances of our ListSummer type. We then call the fork() method (defined in ForkJoinTask) on these two instances –

 `1` `leftSummer.fork();`
 `2` `rightSummer.fork();`

Which cause these tasks to be scheduled for asynchronous execution, the exact mechanism which is used for this purpose will be explained later in this post.

After that, we invoke the join() method (also defined in ForkJoinTask) to wait for the result of these two parts

 `1` `Integer leftSum = leftSummer.join();`
 `2` `Integer rightSum = rightSummer.join();`

Which are then summed with the middle element of the list to get the final result.

Plenty of log messages have been added to make the example easier to understand. However, when we process a list containing thousands of entries, it might not be a good idea to have this detailed logging, especially logging the entire list.

That’s pretty much it. Let’s create a test class now for a test run –

 `01` `public` `class` `ListSummerTest {`
 `02`
 `03` `  ``@Test`
 `04` `  ``public` `void` `shouldSumEmptyList() {`
 `05` `    ``ListSummer summer = ``new` `ListSummer(List.of());`
 `06` `    ``ForkJoinPool forkJoinPool = ``new` `ForkJoinPool();`
 `07` `    ``forkJoinPool.submit(summer);`
 `08`
 `09` `    ``int` `result = summer.join();`
 `10`
 `11` `    ``assertThat(result).isZero();`
 `12` `  ``}`
 `13`
 `14` `  ``@Test`
 `15` `  ``public` `void` `shouldSumListWithOneElement() {`
 `16` `    ``ListSummer summer = ``new` `ListSummer(List.of(``5``));`
 `17` `    ``ForkJoinPool forkJoinPool = ``new` `ForkJoinPool();`
 `18` `    ``forkJoinPool.submit(summer);`
 `19`
 `20` `    ``int` `result = summer.join();`
 `21`
 `22` `    ``assertThat(result).isEqualTo(``5``);`
 `23` `  ``}`
 `24`
 `25` `  ``@Test`
 `26` `  ``public` `void` `shouldSumListWithMultipleElements() {`
 `27` `    ``ListSummer summer = ``new` `ListSummer(List.of(`
 `28` `        ``1``, ``2``, ``3``, ``4``, ``5``, ``6``, ``7``, ``8``, ``9`
 `29` `    ``));`
 `30` `    ``ForkJoinPool forkJoinPool = ``new` `ForkJoinPool();`
 `31` `    ``forkJoinPool.submit(summer);`
 `32`
 `33` `    ``int` `result = summer.join();`
 `34`
 `35` `    ``assertThat(result).isEqualTo(``45``);`
 `36` `  ``}`
 `37` `}`

In the test, we create an instance of the ForkJoinPool. A ForkJoinPool is a unique ExecutorService implementation for running ForkJoinTasks. It employs a special algorithm known as the work-stealing algorithm. Contrary to the other ExecutorService implementations where there is only a single queue holding all the tasks to be executed, in a work-stealing implementation, each worker thread gets its work queue. Each thread starts executing tasks from their queue.

When we detect that a ForkJoinTask can be broken down into multiple smaller subtasks, we do break them into smaller tasks, and then we invoke the fork() method on those tasks. This invocation causes the subtasks to be pushed into the executing thread’s queue. During the execution, when one thread exhausts its queue/has no tasks to execute, it can “steal” tasks from other thread’s queue (hence the name “work-stealing”). This stealing behaviour is what results in a better throughput than using any other ExecutorService implementations.

Earlier, when we invoked fork() on our leftSummer and rightSummer task instances, they got pushed into the work queue of the executing thread, after which they were “stolen” by other active threads in the pool (and so on) since they did not have anything else to do at that point.

Pretty cool, right?

### Solving a blocking task

The problem we solved just now is non-blocking in nature. If we want to solve a problem which does some blocking operation, then to have a better throughput we will need to change our strategy.

Let’s examine this with another example. Let’s say we want to create a very simple web crawler. This crawler will receive a list of HTTP links, execute GET requests to fetch the response bodies, and then calculate the response length. Here is the code –

 `01` `@Slf4j`
 `02` `public` `class` `ResponseLengthCalculator ``extends` `RecursiveTask> {`
 `03` `  ``private` `final` `List links;`
 `04`
 `05` `  ``ResponseLengthCalculator(List links) {`
 `06` `    ``this``.links = links;`
 `07` `  ``}`
 `08`
 `09` `  ``@Override`
 `10` `  ``protected` `Map compute() {`
 `11` `    ``if` `(links.isEmpty()) {`
 `12` `      ``log.info(``"No more links to fetch"``);`
 `13` `      ``return` `Collections.emptyMap();`
 `14` `    ``}`
 `15`
 `16` `    ``int` `middle = links.size() / ``2``;`
 `17` `    ``log.info(``"Middle index: {}"``, links, middle);`
 `18` `    ``ResponseLengthCalculator leftPartition = ``new` `ResponseLengthCalculator(links.subList(``0``, middle));`
 `19` `    ``ResponseLengthCalculator rightPartition = ``new``ResponseLengthCalculator(links.subList(middle + ``1``, links.size()));`
 `20`
 `21` `    ``log.info(``"Forking left partition"``);`
 `22` `    ``leftPartition.fork();`
 `23` `    ``log.info(``"Left partition forked, now forking right partition"``);`
 `24` `    ``rightPartition.fork();`
 `25` `    ``log.info(``"Right partition forked"``);`
 `26`
 `27` `    ``String middleLink = links.get(middle);`
 `28` `    ``HttpRequester httpRequester = ``new` `HttpRequester(middleLink);`
 `29` `    ``String response;`
 `30` `    ``try` `{`
 `31` `      ``log.info(``"Calling managedBlock for {}"``, middleLink);`
 `32` `      ``ForkJoinPool.managedBlock(httpRequester);`
 `33` `      ``response = httpRequester.response;`
 `34` `    ``} ``catch` `(InterruptedException ex) {`
 `35` `      ``log.error(``"Error occurred while trying to implement blocking link fetcher"``, ex);`
 `36` `      ``response = ``""``;`
 `37` `    ``}`
 `38`
 `39` `    ``Map responseMap = ``new` `HashMap<>(links.size());`
 `40`
 `41` `    ``Map leftLinks = leftPartition.join();`
 `42` `    ``responseMap.putAll(leftLinks);`
 `43` `    ``responseMap.put(middleLink, response.length());`
 `44` `    ``Map rightLinks = rightPartition.join();`
 `45` `    ``responseMap.putAll(rightLinks);`
 `46`
 `47` `    ``log.info(``"Left map {}, middle length {}, right map {}"``, leftLinks, response.length(), rightLinks);`
 `48`
 `49` `    ``return` `responseMap;`
 `50` `  ``}`
 `51`
 `52` `  ``private` `static` `class` `HttpRequester ``implements` `ForkJoinPool.ManagedBlocker {`
 `53` `    ``private` `final` `String link;`
 `54` `    ``private` `String response;`
 `55`
 `56` `    ``private` `HttpRequester(String link) {`
 `57` `      ``this``.link = link;`
 `58` `    ``}`
 `59`
 `60` `    ``@Override`
 `61` `    ``public` `boolean` `block() {`
 `62` `      ``HttpGet headRequest = ``new` `HttpGet(link);`
 `63` `      ``CloseableHttpClient client = HttpClientBuilder`
 `64` `          ``.create()`
 `65` `          ``.disableRedirectHandling()`
 `66` `          ``.build();`
 `67` `      ``try` `{`
 `68` `        ``log.info(``"Executing blocking request for {}"``, link);`
 `69` `        ``CloseableHttpResponse response = client.execute(headRequest);`
 `70` `        ``log.info(``"HTTP request for link {} has been executed"``, link);`
 `71` `        ``this``.response = EntityUtils.toString(response.getEntity());`
 `72` `      ``} ``catch` `(IOException e) {`
 `73` `        ``log.error(``"Error while trying to fetch response from link {}: {}"``, link, e.getMessage());`
 `74` `        ``this``.response = ``""``;`
 `75` `      ``}`
 `76` `      ``return` `true``;`
 `77` `    ``}`
 `78`
 `79` `    ``@Override`
 `80` `    ``public` `boolean` `isReleasable() {`
 `81` `      ``return` `false``;`
 `82` `    ``}`
 `83` `  ``}`
 `84` `}`

We create an implementation of the ForkJoinPool.ManagedBlocker where we put the blocking HTTP call. This interface defines two methods – block() and isReleasable(). The block() method is where we put our blocking call. After we are done with our blocking operation, we return true indicating that no further blocking is necessary. We return false from the isReleasable() implementation to indicate to a fork-join worker thread that the block() method implementation is potentially blocking in nature. The isReleasable() implementation will be invoked by a fork-join worker thread first before it invokes the block() method. Finally, we submit our  HttpRequester instance to our pool by invoking ForkJoinPool.managedBlock()static method. After that our blocking task will start executing. When it blocks on the HTTP request, the ForkJoinPool.managedBlock() method will also arrange for a spare thread to be activated if necessary to ensure sufficient parallelism.

Let’s take this implementation for a test drive then! Here’s the code –

 `01` `public` `class` `ResponseLengthCalculatorTest {`
 `02`
 `03` `  ``@Test`
 `04` `  ``public` `void` `shouldReturnEmptyMapForEmptyList() {`
 `05` `    ``ResponseLengthCalculator responseLengthCalculator = ``new``ResponseLengthCalculator(Collections.emptyList());`
 `06` `    ``ForkJoinPool pool = ``new` `ForkJoinPool();`
 `07`
 `08` `    ``pool.submit(responseLengthCalculator);`
 `09`
 `10` `    ``Map result = responseLengthCalculator.join();`
 `11` `    ``assertThat(result).isEmpty();`
 `12` `  ``}`
 `13`
 `14` `  ``@Test`
 `15` `  ``public` `void` `shouldHandle200Ok() {`
 `16` `    ``ResponseLengthCalculator responseLengthCalculator = ``new``ResponseLengthCalculator(List.of(`
 `17` `        ``"http://httpstat.us/200"`
 `18` `    ``));`
 `19` `    ``ForkJoinPool pool = ``new` `ForkJoinPool();`
 `20`
 `21` `    ``pool.submit(responseLengthCalculator);`
 `22`
 `23` `    ``Map result = responseLengthCalculator.join();`
 `24` `    ``assertThat(result)`
 `25` `        ``.hasSize(``1``)`
 `26` `        ``.containsKeys(``"http://httpstat.us/200"``)`
 `27` `        ``.containsValue(``0``);`
 `28` `  ``}`
 `29`
 `30` `  ``@Test`
 `31` `  ``public` `void` `shouldFetchResponseForDifferentResponseStatus() {`
 `32` `    ``ResponseLengthCalculator responseLengthCalculator = ``new``ResponseLengthCalculator(List.of(`
 `33` `        ``"http://httpstat.us/200"``,`
 `34` `        ``"http://httpstat.us/302"``,`
 `35` `        ``"http://httpstat.us/404"``,`
 `36` `        ``"http://httpstat.us/502"`
 `37` `    ``));`
 `38` `    ``ForkJoinPool pool = ``new` `ForkJoinPool();`
 `39`
 `40` `    ``pool.submit(responseLengthCalculator);`
 `41`
 `42` `    ``Map result = responseLengthCalculator.join();`
 `43` `    ``assertThat(result)`
 `44` `        ``.hasSize(``4``);`
 `45` `  ``}`
 `46` `}`

That’s it for today, folks! As always, any feedback/improvement suggestions/comments are highly appreciated!

All the examples discussed here can be found onGithub (specific commit).

A big shout out to the awesome http://httpstat.us service, it was quite helpful for developing the simple tests.