Simplify Concurrency Using ParallelStream

Choosing the right multithreading framework for a Java application can be more difficulty than implementing the solution. Out of the box there are several utilities suitable for parallelizing simple I/O bound tasks such as ExecutorService (Java 1.5), ForkJoinPool (Java 1.7), ParallelStream (Java 1.8), CompletableFuture (Java 1.8), and Spring’s TaskExecutor (Spring 2.0) with its optional @EnableAsync (Spring 3.1) proxying. Any of these tools are more than capable of getting the job done with nearly equivalent performance metrics, and so the most important selection criterion is maintainability. As we’ll see, ParallelStream provides arguably the cleanest syntax for handling basic concurrency.

 Test Scenario

A RESTful microservice that responds to POST requests by performing four database writes and then returning a 200 or 400 HTTP code depending on success execution of the database queries. Based on performance testing it was determined that the service can process 20 simultaneous requests before throughput or latency deteriorate.

int numRequests = 20;
int numDBWritesPerRequest = 4;
int threadPoolSize = numRequests * numDBWritesPerRequest;

 Mocking tasks can be done as follows

void writeToDB() {
  // jdbcTemplate.update(...)
  Thread.sleep(1000);
}

It needs to be said that mocking DB or REST calls using sleep only gets so far. For example, there’s a lot of CPU overhead for setting up and tearing down TCP/IP connections in an HttpConnectionPool.

 ExecutorService

ExecutorService requires quite a bit of coding to handle the test scenario.

boolean handleRequest() {
  ExecutorService threadPool =
          Executors.newFixedThreadPool(threadPoolSize);

  // Initiate the DB writes.
  List<Future<Void>> futures = new ArrayList<>();
  for (int i = 0; i < numDBWritesPerRequest; i++) {
    Future<Void> future = threadPool.submit(() -> {
      writeToDB();
      return null;
    });
    futures.add(future);
  }

  // Determine if the DB writes were successful.
  boolean isSuccess = true;
  try {
    futures.forEach(Future::get);
  } catch (Exception e) {
    isSuccess = false;
  }
  return isSuccess;
}

 ForkJoinPool

ForkJoinPool looks the same as ExecutorService except the thread pool is created using:

new ForkJoinPool(threadPoolSize)
// ForkJoinPool.commonPool()

instead of

Executors.newFixedThreadPool(threadPoolSize)

and so little is gained or lost.

 Spring’s @EnableAsync

Spring applications annotated with @EnableAsync allow for @Async methods that will be be executed asynchronously in Spring’s ThreadPoolTaskExecutor common pool. Configuring the pool size can be done by creating a bean:

@EnableAsync
@Configuration
public class SpringApp {

  @Bean
  public Executor taskExecutor() {
    Executor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threadPoolSize);
    return executor;
  }
}

and then exercising the utility as follows:

boolean handleRequest() {
  List<Future<Void>> futures = new ArrayList<>();
  for (int i = 0; i < numberTasks; i++) {
    futures.add(writeToDB());
  }

 boolean isSuccess = true;
  try {
    futures.forEach(Future::get);
  } catch (Exception e) {
    isSuccess = false;
  }
  return isSuccess;
}

@Async
Future<Void> writeToDB() {
  Thread.sleep(1000);
  return new AsyncResult<>(null);
}

Which is an nice way of reducing some code complexity. The biggest caveat is that because of how Spring’s proxying works, the @Async method needs to be the entry point into the class and so in our example the writeToDB method has to be refactored into a separate class. In my experience, this can cause more problems than it is worth as code is refactored into new classes simply to hide the threadPool.submit(...) call.

 ParallelStream

ParallelStream looks something like this:

boolean handleRequest() {
  List<Integer> tasks = asList(0, 1, 2, 3);

  boolean isSuccess = true;
  try {
    tasks.parallelStream().forEach(x -> writeToDB());
  } catch (Exception e) {
    isSuccess = false;
  }
  return isSuccess;
}

We see that ParallelStream removes a lot of complexity that ExecutorService introduced. ParallelStream automatically executes the four database writes concurrently and blocks until all of the tasks are complete. With the other solutions, code had to be written to collect the Future results and call future.get() on them to verify the successful execution of the database queries.

 Configuring the thread pool size of ParallelStream

ParallelStream uses a common fork join pool to execute tasks that has a default size of Runtime.getRuntime().availableProcessors() - 1. In the test scenario it was determined that 80 tasks can be handled at once, and so the common pool size needs to be modified using the java.util.concurrent.ForkJoinPool.common.parallelism property. This can be done as a VM option such as

java -jar -Djava.util.concurrent.ForkJoinPool.common.parallelism=80

or programmatically using

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", threadPoolSize). 

 CompletableFuture

Another way to utilize Java’s common fork join pool is with CompletableFuture, which provides a lot of tools to help simplify complex asynchronous operations. For simple parallelization it is more complex than ParallelStream:

boolean handleRequest() {
  List<Integer> tasks = asList(0, 1, 2, 3);

  boolean isSuccess = true;
  try {
    tasks.stream()
      .map(x -> CompletableFuture.runAsync(this::writeToDB))
      .collect(toList())
      .forEach(x -> x.join());
   } catch (Exception e) {
    isSuccess = false;
  }
  return isSuccess;
}

 Conclusion

Under certain use cases ParallelStream provides a simple, highly readable solution for managing concurrency.

 
31
Kudos
 
31
Kudos

Now read this

Create an Asynchronous Spring REST API with CompletableFutures and DeferredResult

Spring MVC is a powerful framework for building RESTful APIs. In accordance with the shift to asynchronous programming, Spring introduced the DeferredResult which an endpoint method can return so that its contents are propagated to the... Continue →

Subscribe to Carl Martensen

Don’t worry; we hate spam with a passion.
You can unsubscribe with one click.

F62tWw9EZzv86AwAMvUg