Contents

Using ExecutorService invokeAll with virtual threads

Introduction

InvokeAll is a method on the ExecutorService to start multiple submitted tasks simultaneously. The ExecutorService will use a platform thread from its pool to run the submitted task. Instead of using these expensive and resource-heavy platform threads, we can also use virtual threads to run the tasks submitted to the ExecutorService. This article will cover all the ways to implement the invokeAll method with virtual threads, structured concurrency, and platform threads.

InvokeAll with virtual threads

The first example we are going to look at uses virtual threads. We have a try-with-recourse statement that creates an executor that makes a virtual thread for each task. On line 8, the invokeAll method is called with a list of tasks.

After starting all the tasks we create a stream that will wait for the results and return them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
static List<String> invokeAllWithVirtualThreads() throws ExecutionException, InterruptedException {
    try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

        var tasks = new ArrayList<Callable<String>>();
        tasks.add(() -> getStringFromResourceA());
        tasks.add(() -> getStringFromResourceB());

        List<Future<String>> futures = executor.invokeAll(tasks);

        return futures.stream().map(f -> {
            try {
                return f.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }
}

InvokeAll with structured concurrency

Structured concurrency is a new way of managing the lifetime of threads in Java. The lifetime is controlled through a StructuredTaskScope. There are mainly two scopes, one with InvokeAny behavior and one with InvokeAll behavior. For this article, we are going to implement InvokeAll. To make a StructuredTaskScope that waits till all threads are finished running, we need to call the ShutdownOnFailure method.

We need to call the fork method to add a task for the scope to execute. The scope.join() method call at line 7 will block till all the threads are done. At line 10, we check if an exception has been thrown before we get the results.

Because we are inside and scope, we will use the new resultNow method to get the results from the futures instead of calling get. This is the new preferred way because we have already waited for the threads to be done.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
static String invokeAllWithStructuredConcurrency() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String> futureA = scope.fork(() -> getStringFromResourceA());
        Future<String> futureB = scope.fork(() -> getStringFromResourceB());

        // wait till all threads are done
        scope.join();

        // throw an exception if one occurred
        scope.throwIfFailed();

        return "result: " + futureA.resultNow() + " " + futureB.resultNow();
    }
}

InvokeAll with platform threads

Platform threads are the threads we know from previous Java versions. These threads are tightly coupled to threads that are managed by the operating system. The example below looks like the first example that uses virtual threads because we use an ExecutorService. This time the ExecutorService we are using is a CachedThreadPool. This pool creates a new thread when needed but will reuse threads that have already finished their task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
static List<String> invokeAllWithPlatformThreads() throws ExecutionException, InterruptedException {
    try (ExecutorService executor = Executors.newCachedThreadPool()) {

        var tasks = new ArrayList<Callable<String>>();
        tasks.add(() -> getStringFromResourceA());
        tasks.add(() -> getStringFromResourceB());

        List<Future<String>> futures = executor.invokeAll(tasks);

        return futures.stream().map(f -> {
            try {
                return f.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }
}

Conclusion

This article looked at three ways of implementing the invokeAll method. We first used ExecutorService, which creates virtual threads for each task it needs to perform. After that, we saw how to make a StructuredTaskScope that behaves like the invokeAll method. The last example used the platform threads we are already familiar with from previous Java versions.

Further reading

More about virtual threads in Java:

If you want to read more about Java when I write about it – you can follow me on Twitter.